diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index 970bdf01..6bdfe749 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -208,10 +208,15 @@ struct aws_mqtt_client_connection { struct aws_byte_buf payload; } will; struct { - uint64_t current_sec; /* seconds */ - uint64_t min_sec; /* seconds */ - uint64_t max_sec; /* seconds */ - uint64_t next_attempt_reset_timer_ns; /* nanoseconds */ + uint64_t current_sec; /* seconds */ + uint64_t min_sec; /* seconds */ + uint64_t max_sec; /* seconds */ + + /* + * Invariant: this is always zero except when the current MQTT channel has received a successful connack + * and is not yet shutdown. During that interval, it is the timestamp the connack was received. + */ + uint64_t channel_successful_connack_timestamp_ns; } reconnect_timeouts; /* User connection callbacks */ diff --git a/source/client.c b/source/client.c index bbcb2321..7aea13b7 100644 --- a/source/client.c +++ b/source/client.c @@ -208,6 +208,8 @@ void aws_mqtt_client_release(struct aws_mqtt_client *client) { } } +#define AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS 10 + /* At this point, the channel for the MQTT connection has completed its shutdown */ static void s_mqtt_client_shutdown( struct aws_client_bootstrap *bootstrap, @@ -222,12 +224,41 @@ static void s_mqtt_client_shutdown( AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code); + enum aws_mqtt_client_connection_state prev_state; struct aws_linked_list cancelling_requests; aws_linked_list_init(&cancelling_requests); bool disconnected_state = false; { /* BEGIN CRITICAL SECTION */ mqtt_connection_lock_synced_data(connection); + + /* + * On a channel that represents a valid connection (successful connack received), + * channel_successful_connack_timestamp_ns will be the time the connack was received. Otherwise it will be + * zero. + * + * Use that fact to determine whether or not we should reset the current reconnect backoff delay. + * + * We reset the reconnect backoff if either of: + * 1) the user called disconnect() + * 2) a successful connection had lasted longer than our minimum reset time (10s at the moment) + */ + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + uint64_t time_diff = now - connection->reconnect_timeouts.channel_successful_connack_timestamp_ns; + + bool was_user_disconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING; + bool was_sufficiently_long_connection = + (connection->reconnect_timeouts.channel_successful_connack_timestamp_ns != 0) && + (time_diff >= + aws_timestamp_convert( + AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL)); + + if (was_user_disconnect || was_sufficiently_long_connection) { + connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.min_sec; + } + connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = 0; + /* Move all the ongoing requests to the pending requests list, because the response they are waiting for will * never arrives. Sad. But, we will retry. */ if (connection->clean_session) { @@ -623,12 +654,6 @@ static void s_attempt_reconnect(struct aws_task *task, void *userdata, enum aws_ mqtt_connection_lock_synced_data(connection); - AWS_LOGF_TRACE( - AWS_LS_MQTT_CLIENT, - "id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds", - (void *)connection, - connection->reconnect_timeouts.current_sec); - /* Check before multiplying to avoid potential overflow */ if (connection->reconnect_timeouts.current_sec > connection->reconnect_timeouts.max_sec / 2) { connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.max_sec; @@ -636,14 +661,11 @@ static void s_attempt_reconnect(struct aws_task *task, void *userdata, enum aws_ connection->reconnect_timeouts.current_sec *= 2; } - /* Apply updated reconnect_timeout to next_attempt_reset_timer_ns to prevent premature reset to min - * of min reconnect on a successful connect after a prolonged period of failed connections */ - uint64_t now = 0; - aws_high_res_clock_get_ticks(&now); - connection->reconnect_timeouts.next_attempt_reset_timer_ns = - now + 10000000000 + - aws_timestamp_convert( - connection->reconnect_timeouts.current_sec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds", + (void *)connection, + connection->reconnect_timeouts.current_sec); mqtt_connection_unlock_synced_data(connection); @@ -807,6 +829,7 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt AWS_ZERO_STRUCT(connection->synced_data); connection->synced_data.state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; connection->reconnect_timeouts.min_sec = 1; + connection->reconnect_timeouts.current_sec = 1; connection->reconnect_timeouts.max_sec = 128; aws_linked_list_init(&connection->synced_data.pending_requests_list); aws_linked_list_init(&connection->thread_data.ongoing_requests_list); @@ -1060,6 +1083,7 @@ int aws_mqtt_client_connection_set_reconnect_timeout( max_timeout); connection->reconnect_timeouts.min_sec = min_timeout; connection->reconnect_timeouts.max_sec = max_timeout; + connection->reconnect_timeouts.current_sec = min_timeout; return AWS_OP_SUCCESS; } @@ -1683,7 +1707,6 @@ int aws_mqtt_client_connection_disconnect( (void *)connection); connection->on_disconnect = on_disconnect; connection->on_disconnect_ud = userdata; - connection->reconnect_timeouts.next_attempt_reset_timer_ns = 0; mqtt_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index 2a1222e6..f9c01cbd 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -110,20 +110,14 @@ static int s_packet_handler_connack( uint64_t now = 0; aws_high_res_clock_get_ticks(&now); - /* - * Only reset the duration of the reconnect timer to min if this connect is happening past - * the previously set next_attempt_reset_timer value. The next reset value will be 10 seconds after the next - * connection attempt - */ - if (connection->reconnect_timeouts.next_attempt_reset_timer_ns < now) { - connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.min_sec; - } - connection->reconnect_timeouts.next_attempt_reset_timer_ns = - now + 10000000000 + - aws_timestamp_convert( - connection->reconnect_timeouts.current_sec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); - if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) { + + /* + * This was a successful MQTT connection establishment, record the time so that channel shutdown + * can make a good decision about reconnect backoff reset. + */ + connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = now; + /* If successfully connected, schedule all pending tasks */ AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, "id=%p: connection was accepted processing offline requests.", (void *)connection); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 56695abd..8becd4d4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -64,6 +64,11 @@ add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time) add_test_case(mqtt_connection_close_callback_simple) add_test_case(mqtt_connection_close_callback_interrupted) add_test_case(mqtt_connection_close_callback_multi) +add_test_case(mqtt_connection_reconnection_backoff_stable) +add_test_case(mqtt_connection_reconnection_backoff_unstable) +add_test_case(mqtt_connection_reconnection_backoff_reset) +add_test_case(mqtt_connection_reconnection_backoff_reset_after_disconnection) + # Operation statistics tests add_test_case(mqtt_operation_statistics_simple_publish) add_test_case(mqtt_operation_statistics_offline_publish) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index 6e0ec474..86d680c3 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -16,6 +16,8 @@ #include +#include + #ifdef _WIN32 # define LOCAL_SOCK_TEST_PATTERN "\\\\.\\pipe\\testsock%llu" #else @@ -24,6 +26,10 @@ static const int TEST_LOG_SUBJECT = 60000; static const int ONE_SEC = 1000000000; +// The value is extract from aws-c-mqtt/source/client.c +static const int AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS = 10; +static const uint64_t RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS = 500000000; +#define DEFAULT_MIN_RECONNECT_DELAY_SECONDS 1 #define DEFAULT_TEST_PING_TIMEOUT_MS 1000 #define DEFAULT_TEST_KEEP_ALIVE_S 2 @@ -49,6 +55,7 @@ struct mqtt_connection_state_test { struct aws_mqtt_client *mqtt_client; struct aws_mqtt_client_connection *mqtt_connection; struct aws_socket_options socket_options; + bool session_present; bool connection_completed; bool client_disconnect_completed; @@ -204,6 +211,7 @@ static void s_wait_for_reconnect_to_complete(struct mqtt_connection_state_test * aws_mutex_lock(&state_test_data->lock); aws_condition_variable_wait_pred( &state_test_data->cvar, &state_test_data->lock, s_is_connection_resumed, state_test_data); + state_test_data->connection_resumed = false; aws_mutex_unlock(&state_test_data->lock); } @@ -2927,3 +2935,266 @@ AWS_TEST_CASE_FIXTURE( s_test_mqtt_connection_close_callback_multi_fn, s_clean_up_mqtt_server_fn, &test_data) + +static int s_test_mqtt_connection_reconnection_backoff_stable(struct aws_allocator *allocator, void *ctx) { + + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + uint64_t time_before = 0; + uint64_t time_after = 0; + for (int i = 0; i < 3; i++) { + /* sleep for AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS to make sure our connection is successful */ + aws_thread_current_sleep( + (uint64_t)ONE_SEC * AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS + + RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + + aws_high_res_clock_get_ticks(&time_before); + + /* shut it down and make sure the client automatically reconnects.*/ + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + + uint64_t reconnection_backoff_time = time_after - time_before; + uint64_t remainder = 0; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff_time, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &remainder) == + DEFAULT_MIN_RECONNECT_DELAY_SECONDS); + ASSERT_TRUE(remainder <= RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + } + + /* Disconnect */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_reconnection_backoff_stable, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_reconnection_backoff_stable, + s_clean_up_mqtt_server_fn, + &test_data) + +static int s_test_mqtt_connection_reconnection_backoff_unstable(struct aws_allocator *allocator, void *ctx) { + + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + uint64_t time_before = 0; + uint64_t time_after = 0; + uint64_t expected_reconnect_backoff = 1; + for (int i = 0; i < 3; i++) { + + aws_high_res_clock_get_ticks(&time_before); + + /* shut it down and make sure the client automatically reconnects.*/ + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + + uint64_t reconnection_backoff = time_after - time_before; + uint64_t remainder = 0; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &remainder) == + expected_reconnect_backoff); + ASSERT_TRUE(remainder <= RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + + // Increase the exponential backoff + expected_reconnect_backoff = aws_min_u64(expected_reconnect_backoff * 2, 10); + } + + /* Disconnect */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_reconnection_backoff_unstable, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_reconnection_backoff_unstable, + s_clean_up_mqtt_server_fn, + &test_data) + +static int s_test_mqtt_connection_reconnection_backoff_reset(struct aws_allocator *allocator, void *ctx) { + + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + uint64_t time_before = 0; + uint64_t time_after = 0; + uint64_t expected_reconnect_backoff = 1; + uint64_t reconnection_backoff = 0; + for (int i = 0; i < 3; i++) { + + aws_high_res_clock_get_ticks(&time_before); + + /* shut it down and make sure the client automatically reconnects.*/ + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + reconnection_backoff = time_after - time_before; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, NULL) >= + expected_reconnect_backoff); + + expected_reconnect_backoff = aws_min_u64(expected_reconnect_backoff * 2, 10); + } + + /* sleep for AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS to make sure our connection is successful */ + aws_thread_current_sleep( + (uint64_t)ONE_SEC * AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS + + RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + + aws_high_res_clock_get_ticks(&time_before); + + /* shut it down and make sure the client automatically reconnects.*/ + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + reconnection_backoff = time_after - time_before; + uint64_t remainder = 0; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &remainder) == + DEFAULT_MIN_RECONNECT_DELAY_SECONDS); + ASSERT_TRUE(remainder <= RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + + /* Disconnect */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_reconnection_backoff_reset, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_reconnection_backoff_reset, + s_clean_up_mqtt_server_fn, + &test_data) + +static int s_test_mqtt_connection_reconnection_backoff_reset_after_disconnection( + struct aws_allocator *allocator, + void *ctx) { + + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = false, + + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + uint64_t time_before = 0; + uint64_t time_after = 0; + uint64_t expected_reconnect_backoff = 1; + uint64_t reconnection_backoff = 0; + for (int i = 0; i < 3; i++) { + aws_high_res_clock_get_ticks(&time_before); + + /* shut it down and make sure the client automatically reconnects.*/ + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + reconnection_backoff = time_after - time_before; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, NULL) >= + expected_reconnect_backoff); + + expected_reconnect_backoff = aws_min_u64(expected_reconnect_backoff * 2, 10); + } + /* Disconnect */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + /* connect again */ + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_before); + + aws_channel_shutdown(state_test_data->server_channel, AWS_OP_SUCCESS); + s_wait_for_reconnect_to_complete(state_test_data); + + aws_high_res_clock_get_ticks(&time_after); + reconnection_backoff = time_after - time_before; + uint64_t remainder = 0; + ASSERT_TRUE( + aws_timestamp_convert(reconnection_backoff, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &remainder) == + DEFAULT_MIN_RECONNECT_DELAY_SECONDS); + ASSERT_TRUE(remainder <= RECONNECT_BACKOFF_DELAY_ERROR_MARGIN_NANO_SECONDS); + + /* Disconnect */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_reconnection_backoff_reset_after_disconnection, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_reconnection_backoff_reset_after_disconnection, + s_clean_up_mqtt_server_fn, + &test_data)