Skip to content

Commit

Permalink
Merge pull request #10254 from Icinga/Timeout-Cancel
Browse files Browse the repository at this point in the history
Timeout: use less resources, clean them up better and make cancellation deterministic
  • Loading branch information
julianbrost authored Jan 8, 2025
2 parents 9bc9d14 + 8f72891 commit fba56f0
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 67 deletions.
7 changes: 6 additions & 1 deletion lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,14 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc)
m_Timer.async_wait(yc[ec]);
}

/**
* Cancels any pending timeout callback.
*
* Must be called in the strand in which the callback was scheduled!
*/
void Timeout::Cancel()
{
m_Cancelled.store(true);
m_Cancelled->store(true);

boost::system::error_code ec;
m_Timer.cancel(ec);
Expand Down
95 changes: 63 additions & 32 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
#ifndef IO_ENGINE_H
#define IO_ENGINE_H

#include "base/atomic.hpp"
#include "base/debug.hpp"
#include "base/exception.hpp"
#include "base/lazy-init.hpp"
#include "base/logger.hpp"
#include "base/shared-object.hpp"
#include "base/shared.hpp"
#include <atomic>
#include <exception>
#include <memory>
Expand Down Expand Up @@ -163,51 +165,80 @@ class AsioConditionVariable
/**
* I/O timeout emulator
*
* This class provides a workaround for Boost.ASIO's lack of built-in timeout support.
* While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations.
* This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered
* after a specified duration, effectively adding timeout behavior where none exists.
* The callback is executed within the provided strand, ensuring thread-safety.
*
* The constructor returns immediately after scheduling the timeout callback.
* The callback itself is invoked asynchronously when the timeout occurs.
* This allows the caller to continue execution while the timeout is running in the background.
*
* The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run,
* calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation
* completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback.
* A callback is considered pending even if the timeout has already expired,
* but the callback has not been executed yet due to a busy strand.
*
* @ingroup base
*/
class Timeout : public SharedObject
class Timeout
{
public:
DECLARE_PTR_TYPEDEFS(Timeout);

template<class Executor, class TimeoutFromNow, class OnTimeout>
Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout)
: m_Timer(io)
using Timer = boost::asio::deadline_timer;

/**
* Schedules onTimeout to be triggered after timeoutFromNow on strand.
*
* @param strand The strand in which the callback will be executed.
* The caller must also run in this strand, as well as Cancel() and the destructor!
* @param timeoutFromNow The duration after which the timeout callback will be triggered.
* @param onTimeout The callback to invoke when the timeout occurs.
*/
template<class OnTimeout>
Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout)
: m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared<Atomic<bool>>::Make(false))
{
Ptr keepAlive (this);

m_Cancelled.store(false);
m_Timer.expires_from_now(std::move(timeoutFromNow));

IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) {
if (m_Cancelled.load()) {
return;
}
VERIFY(strand.running_in_this_thread());

{
boost::system::error_code ec;

m_Timer.async_wait(yc[ec]);

if (ec) {
return;
m_Timer.async_wait(boost::asio::bind_executor(
strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) {
if (!ec && !cancelled->load()) {
onTimeout();
}
}
));
}

if (m_Cancelled.load()) {
return;
}

auto f (onTimeout);
f(std::move(yc));
});
Timeout(const Timeout&) = delete;
Timeout(Timeout&&) = delete;
Timeout& operator=(const Timeout&) = delete;
Timeout& operator=(Timeout&&) = delete;

/**
* Cancels any pending timeout callback.
*
* Must be called in the strand in which the callback was scheduled!
*/
~Timeout()
{
Cancel();
}

void Cancel();

private:
boost::asio::deadline_timer m_Timer;
std::atomic<bool> m_Cancelled;
Timer m_Timer;

/**
* Indicates whether the Timeout has been cancelled.
*
* This must be Shared<> between the lambda in the constructor and Cancel() for the case
* the destructor calls Cancel() while the lambda is already queued in the strand.
* The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor.
*/
Shared<Atomic<bool>>::Ptr m_Cancelled;
};

}
Expand Down
9 changes: 3 additions & 6 deletions lib/base/tlsstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,12 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand,
}

{
Timeout::Ptr shutdownTimeout(new Timeout(strand.context(), strand, boost::posix_time::seconds(10),
[this](boost::asio::yield_context yc) {
Timeout shutdownTimeout (strand, boost::posix_time::seconds(10),
[this] {
// Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds.
ForceDisconnect();
}
));
Defer cancelTimeout ([&shutdownTimeout]() {
shutdownTimeout->Cancel();
});
);

// Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer.
boost::system::error_code ec;
Expand Down
3 changes: 0 additions & 3 deletions lib/icingadb/redisconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,6 @@ void RedisConnection::Connect(asio::yield_context& yc)
auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
auto& tlsConn (conn->next_layer());
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });

icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
tlsConn.async_handshake(tlsConn.client, yc);
Expand Down Expand Up @@ -348,7 +347,6 @@ void RedisConnection::Connect(asio::yield_context& yc)

auto conn (Shared<TcpConn>::Make(m_Strand.context()));
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });

icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc);
Expand All @@ -361,7 +359,6 @@ void RedisConnection::Connect(asio::yield_context& yc)

auto conn (Shared<UnixConn>::Make(m_Strand.context()));
auto connectTimeout (MakeTimeout(conn));
Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });

conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc);
Expand Down
11 changes: 4 additions & 7 deletions lib/icingadb/redisconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ namespace icinga
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);

template<class StreamPtr>
Timeout::Ptr MakeTimeout(StreamPtr& stream);
Timeout MakeTimeout(StreamPtr& stream);

String m_Path;
String m_Host;
Expand Down Expand Up @@ -512,15 +512,12 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc)
* @param stream Redis server connection
*/
template<class StreamPtr>
Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream)
Timeout RedisConnection::MakeTimeout(StreamPtr& stream)
{
Ptr keepAlive (this);

return new Timeout(
m_Strand.context(),
return Timeout(
m_Strand,
boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)),
[keepAlive, stream](boost::asio::yield_context yc) {
[stream] {
boost::system::error_code ec;
stream->lowest_layer().cancel(ec);
}
Expand Down
6 changes: 2 additions & 4 deletions lib/methods/ifwapichecktask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
IoEngine::SpawnCoroutine(
*strand,
[strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) {
Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
[&conn, &checkable](boost::asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)),
[&conn, &checkable] {
Log(LogNotice, "IfwApiCheckTask")
<< "Timeout while checking " << checkable->GetReflectionType()->GetName()
<< " '" << checkable->GetName() << "', cancelling attempt";
Expand All @@ -467,8 +467,6 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes
}
);

Defer cancelTimeout ([&timeout]() { timeout->Cancel(); });

DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req);

cr->SetExecutionEnd(Utility::GetTime());
Expand Down
23 changes: 9 additions & 14 deletions lib/remote/apilistener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,16 +534,15 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha
auto strand (Shared<asio::io_context::strand>::Make(io));

IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) {
Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, remoteEndpoint](asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, remoteEndpoint] {
Log(LogWarning, "ApiListener")
<< "Timeout while processing incoming connection from " << remoteEndpoint;

boost::system::error_code ec;
sslConn->lowest_layer().cancel(ec);
}
));
Defer cancelTimeout([timeout]() { timeout->Cancel(); });
);

NewClientHandler(yc, strand, sslConn, String(), RoleServer);
});
Expand Down Expand Up @@ -585,17 +584,16 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint)

lock.unlock();

Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, endpoint, host, port](asio::yield_context yc) {
Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)),
[sslConn, endpoint, host, port] {
Log(LogCritical, "ApiListener")
<< "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host
<< "' and port '" << port << "', cancelling attempt";

boost::system::error_code ec;
sslConn->lowest_layer().cancel(ec);
}
));
Defer cancelTimeout([&timeout]() { timeout->Cancel(); });
);

Connect(sslConn->lowest_layer(), host, port, yc);

Expand Down Expand Up @@ -683,19 +681,16 @@ void ApiListener::NewClientHandlerInternal(
boost::system::error_code ec;

{
Timeout::Ptr handshakeTimeout (new Timeout(
strand->context(),
Timeout handshakeTimeout (
*strand,
boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)),
[strand, client](asio::yield_context yc) {
[client] {
boost::system::error_code ec;
client->lowest_layer().cancel(ec);
}
));
);

sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]);

handshakeTimeout->Cancel();
}

if (ec) {
Expand Down
6 changes: 6 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ set(base_test_SOURCES
base-convert.cpp
base-dictionary.cpp
base-fifo.cpp
base-io-engine.cpp
base-json.cpp
base-match.cpp
base-netstring.cpp
Expand Down Expand Up @@ -128,6 +129,11 @@ add_boost_test(base
base_dictionary/keys_ordered
base_fifo/construct
base_fifo/io
base_io_engine/timeout_run
base_io_engine/timeout_cancelled
base_io_engine/timeout_scope
base_io_engine/timeout_due_cancelled
base_io_engine/timeout_due_scope
base_json/encode
base_json/decode
base_json/invalid1
Expand Down
Loading

0 comments on commit fba56f0

Please sign in to comment.