diff --git a/lib/base/io-engine.cpp b/lib/base/io-engine.cpp index 26125feb310..3190ed03d82 100644 --- a/lib/base/io-engine.cpp +++ b/lib/base/io-engine.cpp @@ -146,9 +146,14 @@ void AsioConditionVariable::Wait(boost::asio::yield_context yc) m_Timer.async_wait(yc[ec]); } +/** + * Cancels any pending timeout callback. + * + * Must be called in the strand in which the callback was scheduled! + */ void Timeout::Cancel() { - m_Cancelled.store(true); + m_Cancelled->store(true); boost::system::error_code ec; m_Timer.cancel(ec); diff --git a/lib/base/io-engine.hpp b/lib/base/io-engine.hpp index 326f04fdc47..0350d45b83d 100644 --- a/lib/base/io-engine.hpp +++ b/lib/base/io-engine.hpp @@ -3,10 +3,12 @@ #ifndef IO_ENGINE_H #define IO_ENGINE_H +#include "base/atomic.hpp" +#include "base/debug.hpp" #include "base/exception.hpp" #include "base/lazy-init.hpp" #include "base/logger.hpp" -#include "base/shared-object.hpp" +#include "base/shared.hpp" #include #include #include @@ -163,51 +165,80 @@ class AsioConditionVariable /** * I/O timeout emulator * + * This class provides a workaround for Boost.ASIO's lack of built-in timeout support. + * While Boost.ASIO handles asynchronous operations, it does not natively support timeouts for these operations. + * This class uses a boost::asio::deadline_timer to emulate a timeout by scheduling a callback to be triggered + * after a specified duration, effectively adding timeout behavior where none exists. + * The callback is executed within the provided strand, ensuring thread-safety. + * + * The constructor returns immediately after scheduling the timeout callback. + * The callback itself is invoked asynchronously when the timeout occurs. + * This allows the caller to continue execution while the timeout is running in the background. + * + * The class provides a Cancel() method to unschedule any pending callback. If the callback has already been run, + * calling Cancel() has no effect. This method can be used to abort the timeout early if the monitored operation + * completes before the callback has been run. The Timeout destructor also automatically cancels any pending callback. + * A callback is considered pending even if the timeout has already expired, + * but the callback has not been executed yet due to a busy strand. + * * @ingroup base */ -class Timeout : public SharedObject +class Timeout { public: - DECLARE_PTR_TYPEDEFS(Timeout); - - template - Timeout(boost::asio::io_context& io, Executor& executor, TimeoutFromNow timeoutFromNow, OnTimeout onTimeout) - : m_Timer(io) + using Timer = boost::asio::deadline_timer; + + /** + * Schedules onTimeout to be triggered after timeoutFromNow on strand. + * + * @param strand The strand in which the callback will be executed. + * The caller must also run in this strand, as well as Cancel() and the destructor! + * @param timeoutFromNow The duration after which the timeout callback will be triggered. + * @param onTimeout The callback to invoke when the timeout occurs. + */ + template + Timeout(boost::asio::io_context::strand& strand, const Timer::duration_type& timeoutFromNow, OnTimeout onTimeout) + : m_Timer(strand.context(), timeoutFromNow), m_Cancelled(Shared>::Make(false)) { - Ptr keepAlive (this); - - m_Cancelled.store(false); - m_Timer.expires_from_now(std::move(timeoutFromNow)); - - IoEngine::SpawnCoroutine(executor, [this, keepAlive, onTimeout](boost::asio::yield_context yc) { - if (m_Cancelled.load()) { - return; - } + VERIFY(strand.running_in_this_thread()); - { - boost::system::error_code ec; - - m_Timer.async_wait(yc[ec]); - - if (ec) { - return; + m_Timer.async_wait(boost::asio::bind_executor( + strand, [cancelled = m_Cancelled, onTimeout = std::move(onTimeout)](boost::system::error_code ec) { + if (!ec && !cancelled->load()) { + onTimeout(); } } + )); + } - if (m_Cancelled.load()) { - return; - } - - auto f (onTimeout); - f(std::move(yc)); - }); + Timeout(const Timeout&) = delete; + Timeout(Timeout&&) = delete; + Timeout& operator=(const Timeout&) = delete; + Timeout& operator=(Timeout&&) = delete; + + /** + * Cancels any pending timeout callback. + * + * Must be called in the strand in which the callback was scheduled! + */ + ~Timeout() + { + Cancel(); } void Cancel(); private: - boost::asio::deadline_timer m_Timer; - std::atomic m_Cancelled; + Timer m_Timer; + + /** + * Indicates whether the Timeout has been cancelled. + * + * This must be Shared<> between the lambda in the constructor and Cancel() for the case + * the destructor calls Cancel() while the lambda is already queued in the strand. + * The whole Timeout instance can't be kept alive by the lambda because this would delay the destructor. + */ + Shared>::Ptr m_Cancelled; }; } diff --git a/lib/base/tlsstream.cpp b/lib/base/tlsstream.cpp index ed80058372a..66514e0cf10 100644 --- a/lib/base/tlsstream.cpp +++ b/lib/base/tlsstream.cpp @@ -140,15 +140,12 @@ void AsioTlsStream::GracefulDisconnect(boost::asio::io_context::strand& strand, } { - Timeout::Ptr shutdownTimeout(new Timeout(strand.context(), strand, boost::posix_time::seconds(10), - [this](boost::asio::yield_context yc) { + Timeout shutdownTimeout (strand, boost::posix_time::seconds(10), + [this] { // Forcefully terminate the connection if async_shutdown() blocked more than 10 seconds. ForceDisconnect(); } - )); - Defer cancelTimeout ([&shutdownTimeout]() { - shutdownTimeout->Cancel(); - }); + ); // Close the TLS connection, effectively uses SSL_shutdown() to send a close_notify shutdown alert to the peer. boost::system::error_code ec; diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index c187d7f1ed0..a6b82187dd0 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -318,7 +318,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context(), *m_TLSContext, m_Host)); auto& tlsConn (conn->next_layer()); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc); tlsConn.async_handshake(tlsConn.client, yc); @@ -348,7 +347,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); Handshake(conn, yc); @@ -361,7 +359,6 @@ void RedisConnection::Connect(asio::yield_context& yc) auto conn (Shared::Make(m_Strand.context())); auto connectTimeout (MakeTimeout(conn)); - Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); }); conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); Handshake(conn, yc); diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index fecd236f9b3..3f963f3d37d 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -222,7 +222,7 @@ namespace icinga void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); template - Timeout::Ptr MakeTimeout(StreamPtr& stream); + Timeout MakeTimeout(StreamPtr& stream); String m_Path; String m_Host; @@ -512,15 +512,12 @@ void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) * @param stream Redis server connection */ template -Timeout::Ptr RedisConnection::MakeTimeout(StreamPtr& stream) +Timeout RedisConnection::MakeTimeout(StreamPtr& stream) { - Ptr keepAlive (this); - - return new Timeout( - m_Strand.context(), + return Timeout( m_Strand, boost::posix_time::microseconds(intmax_t(m_ConnectTimeout * 1000000)), - [keepAlive, stream](boost::asio::yield_context yc) { + [stream] { boost::system::error_code ec; stream->lowest_layer().cancel(ec); } diff --git a/lib/methods/ifwapichecktask.cpp b/lib/methods/ifwapichecktask.cpp index 9a62444b6c6..ce48deefc3a 100644 --- a/lib/methods/ifwapichecktask.cpp +++ b/lib/methods/ifwapichecktask.cpp @@ -456,8 +456,8 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes IoEngine::SpawnCoroutine( *strand, [strand, checkable, cr, psCommand, psHost, expectedSan, psPort, conn, req, checkTimeout, reportResult = std::move(reportResult)](asio::yield_context yc) { - Timeout::Ptr timeout = new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), - [&conn, &checkable](boost::asio::yield_context yc) { + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(checkTimeout * 1e6)), + [&conn, &checkable] { Log(LogNotice, "IfwApiCheckTask") << "Timeout while checking " << checkable->GetReflectionType()->GetName() << " '" << checkable->GetName() << "', cancelling attempt"; @@ -467,8 +467,6 @@ void IfwApiCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckRes } ); - Defer cancelTimeout ([&timeout]() { timeout->Cancel(); }); - DoIfwNetIo(yc, cr, psCommand, psHost, expectedSan, psPort, *conn, *req); cr->SetExecutionEnd(Utility::GetTime()); diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 9c2a489da6e..519469aafa4 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -534,16 +534,15 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha auto strand (Shared::Make(io)); IoEngine::SpawnCoroutine(*strand, [this, strand, sslConn, remoteEndpoint](asio::yield_context yc) { - Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), - [sslConn, remoteEndpoint](asio::yield_context yc) { + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + [sslConn, remoteEndpoint] { Log(LogWarning, "ApiListener") << "Timeout while processing incoming connection from " << remoteEndpoint; boost::system::error_code ec; sslConn->lowest_layer().cancel(ec); } - )); - Defer cancelTimeout([timeout]() { timeout->Cancel(); }); + ); NewClientHandler(yc, strand, sslConn, String(), RoleServer); }); @@ -585,8 +584,8 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) lock.unlock(); - Timeout::Ptr timeout(new Timeout(strand->context(), *strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), - [sslConn, endpoint, host, port](asio::yield_context yc) { + Timeout timeout (*strand, boost::posix_time::microseconds(int64_t(GetConnectTimeout() * 1e6)), + [sslConn, endpoint, host, port] { Log(LogCritical, "ApiListener") << "Timeout while reconnecting to endpoint '" << endpoint->GetName() << "' via host '" << host << "' and port '" << port << "', cancelling attempt"; @@ -594,8 +593,7 @@ void ApiListener::AddConnection(const Endpoint::Ptr& endpoint) boost::system::error_code ec; sslConn->lowest_layer().cancel(ec); } - )); - Defer cancelTimeout([&timeout]() { timeout->Cancel(); }); + ); Connect(sslConn->lowest_layer(), host, port, yc); @@ -683,19 +681,16 @@ void ApiListener::NewClientHandlerInternal( boost::system::error_code ec; { - Timeout::Ptr handshakeTimeout (new Timeout( - strand->context(), + Timeout handshakeTimeout ( *strand, boost::posix_time::microseconds(intmax_t(Configuration::TlsHandshakeTimeout * 1000000)), - [strand, client](asio::yield_context yc) { + [client] { boost::system::error_code ec; client->lowest_layer().cancel(ec); } - )); + ); sslConn.async_handshake(role == RoleClient ? sslConn.client : sslConn.server, yc[ec]); - - handshakeTimeout->Cancel(); } if (ec) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index dd9724f0bf0..a255178da2c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -62,6 +62,7 @@ set(base_test_SOURCES base-convert.cpp base-dictionary.cpp base-fifo.cpp + base-io-engine.cpp base-json.cpp base-match.cpp base-netstring.cpp @@ -128,6 +129,11 @@ add_boost_test(base base_dictionary/keys_ordered base_fifo/construct base_fifo/io + base_io_engine/timeout_run + base_io_engine/timeout_cancelled + base_io_engine/timeout_scope + base_io_engine/timeout_due_cancelled + base_io_engine/timeout_due_scope base_json/encode base_json/decode base_json/invalid1 diff --git a/test/base-io-engine.cpp b/test/base-io-engine.cpp new file mode 100644 index 00000000000..869688b1a67 --- /dev/null +++ b/test/base-io-engine.cpp @@ -0,0 +1,159 @@ +/* Icinga 2 | (c) 2024 Icinga GmbH | GPLv2+ */ + +#include "base/io-engine.hpp" +#include "base/utility.hpp" +#include +#include +#include +#include + +using namespace icinga; + +BOOST_AUTO_TEST_SUITE(base_io_engine) + +BOOST_AUTO_TEST_CASE(timeout_run) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + std::thread eventLoop ([&io] { io.run(); }); + io.run(); + eventLoop.join(); + + BOOST_CHECK_EQUAL(called, 1); +} + +BOOST_AUTO_TEST_CASE(timeout_cancelled) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + + timeout.Cancel(); + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + std::thread eventLoop ([&io] { io.run(); }); + io.run(); + eventLoop.join(); + + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_scope) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + { + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + } + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(200)); + timer.async_wait(yc); + }); + + std::thread eventLoop ([&io] { io.run(); }); + io.run(); + eventLoop.join(); + + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_due_cancelled) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... + Utility::Sleep(0.4); + + BOOST_CHECK_EQUAL(called, 0); + + // ... so that this shall still work: + timeout.Cancel(); + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(100)); + timer.async_wait(yc); + }); + + std::thread eventLoop ([&io] { io.run(); }); + io.run(); + eventLoop.join(); + + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_CASE(timeout_due_scope) +{ + boost::asio::io_context io; + boost::asio::io_context::strand strand (io); + int called = 0; + + boost::asio::spawn(strand, [&](boost::asio::yield_context yc) { + boost::asio::deadline_timer timer (io); + + { + Timeout timeout (strand, boost::posix_time::millisec(300), [&called] { ++called; }); + + // Give the timeout enough time to become due while blocking its strand to prevent it from actually running... + Utility::Sleep(0.4); + + BOOST_CHECK_EQUAL(called, 0); + } // ... so that Timeout#~Timeout() shall still work here. + + BOOST_CHECK_EQUAL(called, 0); + + timer.expires_from_now(boost::posix_time::millisec(100)); + timer.async_wait(yc); + }); + + std::thread eventLoop ([&io] { io.run(); }); + io.run(); + eventLoop.join(); + + BOOST_CHECK_EQUAL(called, 0); +} + +BOOST_AUTO_TEST_SUITE_END()