Skip to content

Commit

Permalink
Reconnection Backoff Fix (#267)
Browse files Browse the repository at this point in the history
* Solution proposal

* Explanatory comments + logic fix

* More comments

* Move reset logic into shared state lock so that we can check for a user-initiated disconnection too

* Reconnect fix unit tests (#268)

* reconnection backoff time unit tests

* add uc4

* fix reconnection resumed flag

* remove unncessary flag reset

* fix log position

* add error margin for the connection time

---------

Co-authored-by: xiazhvera <[email protected]>
  • Loading branch information
bretambrose and xiazhvera authored Apr 4, 2023
1 parent f3efee5 commit 7e384c9
Show file tree
Hide file tree
Showing 5 changed files with 330 additions and 32 deletions.
13 changes: 9 additions & 4 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,15 @@ struct aws_mqtt_client_connection {
struct aws_byte_buf payload;
} will;
struct {
uint64_t current_sec; /* seconds */
uint64_t min_sec; /* seconds */
uint64_t max_sec; /* seconds */
uint64_t next_attempt_reset_timer_ns; /* nanoseconds */
uint64_t current_sec; /* seconds */
uint64_t min_sec; /* seconds */
uint64_t max_sec; /* seconds */

/*
* Invariant: this is always zero except when the current MQTT channel has received a successful connack
* and is not yet shutdown. During that interval, it is the timestamp the connack was received.
*/
uint64_t channel_successful_connack_timestamp_ns;
} reconnect_timeouts;

/* User connection callbacks */
Expand Down
53 changes: 38 additions & 15 deletions source/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ void aws_mqtt_client_release(struct aws_mqtt_client *client) {
}
}

#define AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS 10

/* At this point, the channel for the MQTT connection has completed its shutdown */
static void s_mqtt_client_shutdown(
struct aws_client_bootstrap *bootstrap,
Expand All @@ -222,12 +224,41 @@ static void s_mqtt_client_shutdown(

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);

enum aws_mqtt_client_connection_state prev_state;
struct aws_linked_list cancelling_requests;
aws_linked_list_init(&cancelling_requests);
bool disconnected_state = false;
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);

/*
* On a channel that represents a valid connection (successful connack received),
* channel_successful_connack_timestamp_ns will be the time the connack was received. Otherwise it will be
* zero.
*
* Use that fact to determine whether or not we should reset the current reconnect backoff delay.
*
* We reset the reconnect backoff if either of:
* 1) the user called disconnect()
* 2) a successful connection had lasted longer than our minimum reset time (10s at the moment)
*/
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);
uint64_t time_diff = now - connection->reconnect_timeouts.channel_successful_connack_timestamp_ns;

bool was_user_disconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING;
bool was_sufficiently_long_connection =
(connection->reconnect_timeouts.channel_successful_connack_timestamp_ns != 0) &&
(time_diff >=
aws_timestamp_convert(
AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));

if (was_user_disconnect || was_sufficiently_long_connection) {
connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.min_sec;
}
connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = 0;

/* Move all the ongoing requests to the pending requests list, because the response they are waiting for will
* never arrives. Sad. But, we will retry. */
if (connection->clean_session) {
Expand Down Expand Up @@ -623,27 +654,18 @@ static void s_attempt_reconnect(struct aws_task *task, void *userdata, enum aws_

mqtt_connection_lock_synced_data(connection);

AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds",
(void *)connection,
connection->reconnect_timeouts.current_sec);

/* Check before multiplying to avoid potential overflow */
if (connection->reconnect_timeouts.current_sec > connection->reconnect_timeouts.max_sec / 2) {
connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.max_sec;
} else {
connection->reconnect_timeouts.current_sec *= 2;
}

/* Apply updated reconnect_timeout to next_attempt_reset_timer_ns to prevent premature reset to min
* of min reconnect on a successful connect after a prolonged period of failed connections */
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);
connection->reconnect_timeouts.next_attempt_reset_timer_ns =
now + 10000000000 +
aws_timestamp_convert(
connection->reconnect_timeouts.current_sec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds",
(void *)connection,
connection->reconnect_timeouts.current_sec);

mqtt_connection_unlock_synced_data(connection);

Expand Down Expand Up @@ -807,6 +829,7 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqt
AWS_ZERO_STRUCT(connection->synced_data);
connection->synced_data.state = AWS_MQTT_CLIENT_STATE_DISCONNECTED;
connection->reconnect_timeouts.min_sec = 1;
connection->reconnect_timeouts.current_sec = 1;
connection->reconnect_timeouts.max_sec = 128;
aws_linked_list_init(&connection->synced_data.pending_requests_list);
aws_linked_list_init(&connection->thread_data.ongoing_requests_list);
Expand Down Expand Up @@ -1060,6 +1083,7 @@ int aws_mqtt_client_connection_set_reconnect_timeout(
max_timeout);
connection->reconnect_timeouts.min_sec = min_timeout;
connection->reconnect_timeouts.max_sec = max_timeout;
connection->reconnect_timeouts.current_sec = min_timeout;

return AWS_OP_SUCCESS;
}
Expand Down Expand Up @@ -1683,7 +1707,6 @@ int aws_mqtt_client_connection_disconnect(
(void *)connection);
connection->on_disconnect = on_disconnect;
connection->on_disconnect_ud = userdata;
connection->reconnect_timeouts.next_attempt_reset_timer_ns = 0;
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

Expand Down
20 changes: 7 additions & 13 deletions source/client_channel_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,14 @@ static int s_packet_handler_connack(
uint64_t now = 0;
aws_high_res_clock_get_ticks(&now);

/*
* Only reset the duration of the reconnect timer to min if this connect is happening past
* the previously set next_attempt_reset_timer value. The next reset value will be 10 seconds after the next
* connection attempt
*/
if (connection->reconnect_timeouts.next_attempt_reset_timer_ns < now) {
connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.min_sec;
}
connection->reconnect_timeouts.next_attempt_reset_timer_ns =
now + 10000000000 +
aws_timestamp_convert(
connection->reconnect_timeouts.current_sec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);

if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) {

/*
* This was a successful MQTT connection establishment, record the time so that channel shutdown
* can make a good decision about reconnect backoff reset.
*/
connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = now;

/* If successfully connected, schedule all pending tasks */
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: connection was accepted processing offline requests.", (void *)connection);
Expand Down
5 changes: 5 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time)
add_test_case(mqtt_connection_close_callback_simple)
add_test_case(mqtt_connection_close_callback_interrupted)
add_test_case(mqtt_connection_close_callback_multi)
add_test_case(mqtt_connection_reconnection_backoff_stable)
add_test_case(mqtt_connection_reconnection_backoff_unstable)
add_test_case(mqtt_connection_reconnection_backoff_reset)
add_test_case(mqtt_connection_reconnection_backoff_reset_after_disconnection)

# Operation statistics tests
add_test_case(mqtt_operation_statistics_simple_publish)
add_test_case(mqtt_operation_statistics_offline_publish)
Expand Down
Loading

0 comments on commit 7e384c9

Please sign in to comment.