diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index 6145c23a..d1321360 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -198,6 +198,7 @@ struct aws_mqtt_client_connection { struct aws_byte_buf client_id; bool clean_session; uint16_t keep_alive_time_secs; + uint64_t keep_alive_time_ns; uint64_t ping_timeout_ns; uint64_t operation_timeout_ns; struct aws_string *username; @@ -309,6 +310,14 @@ struct aws_mqtt_client_connection { struct aws_http_message *handshake_request; } websocket; + /** + * The time that the next ping task should execute at. Note that this does not mean that + * this IS when the ping task will execute, but rather that this is when the next ping + * SHOULD execute. There may be an already scheduled PING task that will elapse sooner + * than this time that has to be rescheduled. + */ + uint64_t next_ping_time; + /** * Statistics tracking operational state */ diff --git a/source/client.c b/source/client.c index c18d7257..c028effc 100644 --- a/source/client.c +++ b/source/client.c @@ -542,7 +542,7 @@ static void s_mqtt_client_init( mqtt_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ - /* intall the slot and handler */ + /* install the slot and handler */ if (failed_create_slot) { AWS_LOGF_ERROR( @@ -688,11 +688,55 @@ static void s_attempt_reconnect(struct aws_task *task, void *userdata, enum aws_ struct aws_mqtt_reconnect_task *reconnect = userdata; struct aws_mqtt_client_connection *connection = aws_atomic_load_ptr(&reconnect->connection_ptr); + /* If the task is not cancelled and a connection has not succeeded, attempt reconnect */ if (status == AWS_TASK_STATUS_RUN_READY && connection) { - /* If the task is not cancelled and a connection has not succeeded, attempt reconnect */ - mqtt_connection_lock_synced_data(connection); + /** + * Check the state and if we are disconnecting (AWS_MQTT_CLIENT_STATE_DISCONNECTING) then we want to skip it + * and abort the reconnect task (or rather, just do not try to reconnect) + */ + if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, "id=%p: Skipping reconnect: Client is trying to disconnect", (void *)connection); + + /** + * There is the nasty world where the disconnect task/function is called right when we are "reconnecting" as + * our state but we have not reconnected. When this happens, the disconnect function doesn't do anything + * beyond setting the state to AWS_MQTT_CLIENT_STATE_DISCONNECTING (aws_mqtt_client_connection_disconnect), + * meaning the disconnect callback will NOT be called nor will we release memory. + * For this reason, we have to do the callback and release of the connection here otherwise the code + * will DEADLOCK forever and that is bad. + */ + bool perform_full_destroy = false; + if (!connection->slot) { + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Reconnect task called but client is disconnecting and has no slot. Finishing disconnect", + (void *)connection); + mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED); + perform_full_destroy = true; + } + + aws_mem_release(reconnect->allocator, reconnect); + connection->reconnect_task = NULL; + + /* Unlock the synced data, then potentially call the disconnect callback and release the connection */ + mqtt_connection_unlock_synced_data(connection); + if (perform_full_destroy) { + MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); + MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL); + aws_mqtt_client_connection_release(connection); + } + return; + } + + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds", + (void *)connection, + connection->reconnect_timeouts.current_sec); + /* Check before multiplying to avoid potential overflow */ if (connection->reconnect_timeouts.current_sec > connection->reconnect_timeouts.max_sec / 2) { connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.max_sec; @@ -1508,6 +1552,9 @@ int aws_mqtt_client_connection_connect( if (!connection->keep_alive_time_secs) { connection->keep_alive_time_secs = s_default_keep_alive_sec; } + connection->keep_alive_time_ns = + aws_timestamp_convert(connection->keep_alive_time_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + if (!connection_options->protocol_operation_timeout_ms) { connection->operation_timeout_ns = UINT64_MAX; } else { @@ -1526,16 +1573,15 @@ int aws_mqtt_client_connection_connect( } /* Keep alive time should always be greater than the timeouts. */ - if (AWS_UNLIKELY(connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS <= connection->ping_timeout_ns)) { + if (AWS_UNLIKELY(connection->keep_alive_time_ns <= connection->ping_timeout_ns)) { AWS_LOGF_FATAL( AWS_LS_MQTT_CLIENT, "id=%p: Illegal configuration, Connection keep alive %" PRIu64 "ns must be greater than the request timeouts %" PRIu64 "ns.", (void *)connection, - (uint64_t)connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS, + connection->keep_alive_time_ns, connection->ping_timeout_ns); - AWS_FATAL_ASSERT( - connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS > connection->ping_timeout_ns); + AWS_FATAL_ASSERT(connection->keep_alive_time_ns > connection->ping_timeout_ns); } AWS_LOGF_INFO( diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index f9c01cbd..62b5b063 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -20,6 +20,18 @@ # pragma warning(disable : 4204) #endif +/******************************************************************************* + * Static Helper functions + ******************************************************************************/ + +/* Caches the socket write time for ping scheduling purposes */ +static void s_update_next_ping_time(struct aws_mqtt_client_connection *connection) { + if (connection->slot != NULL && connection->slot->channel != NULL) { + aws_channel_current_clock_time(connection->slot->channel, &connection->next_ping_time); + aws_add_u64_checked(connection->next_ping_time, connection->keep_alive_time_ns, &connection->next_ping_time); + } +} + /******************************************************************************* * Packet State Machine ******************************************************************************/ @@ -42,18 +54,17 @@ static void s_schedule_ping(struct aws_mqtt_client_connection *connection) { uint64_t now = 0; aws_channel_current_clock_time(connection->slot->channel, &now); - AWS_LOGF_TRACE( - AWS_LS_MQTT_CLIENT, "id=%p: Scheduling PING. current timestamp is %" PRIu64, (void *)connection, now); - uint64_t schedule_time = - now + aws_timestamp_convert(connection->keep_alive_time_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, "id=%p: Scheduling PING task. current timestamp is %" PRIu64, (void *)connection, now); AWS_LOGF_TRACE( AWS_LS_MQTT_CLIENT, - "id=%p: The next ping will be run at timestamp %" PRIu64, + "id=%p: The next PING task will be run at timestamp %" PRIu64, (void *)connection, - schedule_time); - aws_channel_schedule_task_future(connection->slot->channel, &connection->ping_task, schedule_time); + connection->next_ping_time); + + aws_channel_schedule_task_future(connection->slot->channel, &connection->ping_task, connection->next_ping_time); } static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) { @@ -61,8 +72,24 @@ static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, if (status == AWS_TASK_STATUS_RUN_READY) { struct aws_mqtt_client_connection *connection = arg; - AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Sending PING", (void *)connection); - aws_mqtt_client_connection_ping(connection); + + uint64_t now = 0; + aws_channel_current_clock_time(connection->slot->channel, &now); + if (now >= connection->next_ping_time) { + s_update_next_ping_time(connection); + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Sending PING", (void *)connection); + aws_mqtt_client_connection_ping(connection); + } else { + + AWS_LOGF_TRACE( + AWS_LS_MQTT_CLIENT, + "id=%p: Skipped sending PING because scheduled ping time %" PRIu64 + " has not elapsed yet. Current time is %" PRIu64 + ". Rescheduling ping to run at the scheduled ping time...", + (void *)connection, + connection->next_ping_time, + now); + } s_schedule_ping(connection); } } @@ -175,6 +202,7 @@ static int s_packet_handler_connack( AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection); + s_update_next_ping_time(connection); s_schedule_ping(connection); return AWS_OP_SUCCESS; } @@ -793,6 +821,9 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en aws_mqtt_connection_statistics_change_operation_statistic_state( request->connection, request, AWS_MQTT_OSS_NONE); + /* Since a request has complete, update the next ping time */ + s_update_next_ping_time(connection); + aws_hash_table_remove( &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL); aws_memory_pool_release(&connection->synced_data.requests_pool, request); @@ -814,6 +845,9 @@ static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, en aws_mqtt_connection_statistics_change_operation_statistic_state( request->connection, request, AWS_MQTT_OSS_INCOMPLETE | AWS_MQTT_OSS_UNACKED); + /* Since a request has complete, update the next ping time */ + s_update_next_ping_time(connection); + mqtt_connection_unlock_synced_data(connection); } /* END CRITICAL SECTION */ @@ -1057,5 +1091,7 @@ void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int err shutdown_task->error_code = error_code; aws_channel_task_init(&shutdown_task->task, s_mqtt_disconnect_task, connection, "mqtt_disconnect"); aws_channel_schedule_task_now(connection->slot->channel, &shutdown_task->task); + } else { + AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Client currently has no slot to disconnect", (void *)connection); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8becd4d4..63568959 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -61,6 +61,10 @@ add_test_case(mqtt_clean_session_keep_next_session) add_test_case(mqtt_connection_publish_QoS1_timeout) add_test_case(mqtt_connection_unsub_timeout) add_test_case(mqtt_connection_publish_QoS1_timeout_connection_lost_reset_time) +add_test_case(mqtt_connection_ping_norm) +add_test_case(mqtt_connection_ping_no) +add_test_case(mqtt_connection_ping_basic_scenario) +add_test_case(mqtt_connection_ping_double_scenario) add_test_case(mqtt_connection_close_callback_simple) add_test_case(mqtt_connection_close_callback_interrupted) add_test_case(mqtt_connection_close_callback_multi) diff --git a/tests/v3/connection_state_test.c b/tests/v3/connection_state_test.c index b406df2a..2355fb02 100644 --- a/tests/v3/connection_state_test.c +++ b/tests/v3/connection_state_test.c @@ -3201,3 +3201,282 @@ AWS_TEST_CASE_FIXTURE( s_test_mqtt_connection_reconnection_backoff_reset_after_disconnection, s_clean_up_mqtt_server_fn, &test_data) + +/** + * Makes a CONNECT, with 1 second keep alive ping interval, does nothing for roughly 4 seconds, ensures 4 pings are sent + */ +static int s_test_mqtt_connection_ping_norm_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .keep_alive_time_secs = 1, + .ping_timeout_ms = 100, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + /* Wait for 4.5 seconds (to account for slight drift/jitter) */ + aws_thread_current_sleep(4500000000); + + /* Ensure the server got 4 PING packets */ + ASSERT_INT_EQUALS(4, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_ping_norm, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_ping_norm_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Makes a CONNECT, with 1 second keep alive ping interval, send a publish roughly every second, and then ensure NO + * pings were sent + */ +static int s_test_mqtt_connection_ping_no_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .keep_alive_time_secs = 1, + .ping_timeout_ms = 100, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + + for (int i = 0; i < 4; i++) { + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + uint16_t packet_id_1 = aws_mqtt_client_connection_publish( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + s_on_op_complete, + state_test_data); + ASSERT_TRUE(packet_id_1 > 0); + + /* Wait 0.8 seconds */ + aws_thread_current_sleep(800000000); + } + + /* Ensure the server got 0 PING packets */ + ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_ping_no, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_ping_no_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * Test to make sure the PING timing is correct if a publish/packet is sent near the end of the keep alive time. + * Note: Because of socket write jitter and scheduling jitter, the times have a 0.25 (quarter of a second) delta range. + * + * To test this, this test has a keep alive at 4 seconds and makes a publish after 3 seconds. This resets the ping + * task and will reschedule it for 4 seconds from the publish (the PING will be scheduled for 3 seconds after the 4 + * second task is invoked). This test then waits a second, makes sure a PING has NOT been sent (with no ping reschedule, + * it would have) and then waits 3 seconds to ensure and checks that a PING has been sent. Finally, it waits 4 seconds + * to ensure a second PING was sent at the correct time. + */ +static int s_test_mqtt_connection_ping_basic_scenario_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .keep_alive_time_secs = 4, + .ping_timeout_ms = 100, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + /* PING will be in 4 seconds */ + + aws_thread_current_sleep(3000000000); /* Wait 3 seconds */ + + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + + /* Make a publish */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + uint16_t packet_id_1 = aws_mqtt_client_connection_publish( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + s_on_op_complete, + state_test_data); + ASSERT_TRUE(packet_id_1 > 0); + s_wait_for_ops_completed(state_test_data); + /* Publish packet written at 3 seconds */ + + aws_thread_current_sleep(1250000000); /* Wait 1.25 second (the extra 0.25 is to account for jitter) */ + /* PING task has executed and been rescheduled at 3 seconds (1 second passed) */ + + /* Ensure the server has gotten 0 PING packets so far */ + ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + aws_thread_current_sleep( + 3000000000); /* Wait 3 seconds more (no jitter needed because we already added 0.25 in the prior sleep) */ + /* PING task (from publish) has been executed */ + + /* Ensure the server has gotten only 1 PING packet */ + ASSERT_INT_EQUALS(1, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + aws_thread_current_sleep(4000000000); /* Wait 4 seconds (since we didn't publish or anything, it should go back to + normal keep alive time) */ + + /* Ensure the server has gotten 2 PING packets */ + ASSERT_INT_EQUALS(2, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + /* Disconnect and finish! */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_ping_basic_scenario, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_ping_basic_scenario_fn, + s_clean_up_mqtt_server_fn, + &test_data) + +/** + * The test is the same as above (s_test_mqtt_connection_ping_basic_scenario_fn) but after the first publish, it waits + * 1 second and makes another publish, before waiting 4 seconds from that point and ensures only a single PING was sent. + */ +static int s_test_mqtt_connection_ping_double_scenario_fn(struct aws_allocator *allocator, void *ctx) { + (void)allocator; + struct mqtt_connection_state_test *state_test_data = ctx; + + struct aws_mqtt_connection_options connection_options = { + .user_data = state_test_data, + .clean_session = true, + .client_id = aws_byte_cursor_from_c_str("client1234"), + .host_name = aws_byte_cursor_from_c_str(state_test_data->endpoint.address), + .socket_options = &state_test_data->socket_options, + .on_connection_complete = s_on_connection_complete_fn, + .keep_alive_time_secs = 4, + .ping_timeout_ms = 100, + }; + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect(state_test_data->mqtt_connection, &connection_options)); + s_wait_for_connection_to_complete(state_test_data); + /* PING will be in 4 seconds */ + + aws_thread_current_sleep(3000000000); /* Wait 3 seconds */ + + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 1; + aws_mutex_unlock(&state_test_data->lock); + + /* Make a publish */ + struct aws_byte_cursor pub_topic = aws_byte_cursor_from_c_str("/test/topic"); + struct aws_byte_cursor payload_1 = aws_byte_cursor_from_c_str("Test Message 1"); + uint16_t packet_id_1 = aws_mqtt_client_connection_publish( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + s_on_op_complete, + state_test_data); + ASSERT_TRUE(packet_id_1 > 0); + s_wait_for_ops_completed(state_test_data); + /* Publish packet written at 3 seconds */ + + aws_thread_current_sleep(1250000000); /* Wait 1.25 second (the extra 0.25 is to account for jitter) */ + /* PING task has executed and been rescheduled at 3 seconds (1 second passed) */ + + /* Ensure the server has gotten 0 PING packets so far */ + ASSERT_INT_EQUALS(0, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + + aws_thread_current_sleep(750000000); /* wait 0.75 seconds */ + + aws_mutex_lock(&state_test_data->lock); + state_test_data->expected_ops_completed = 2; + aws_mutex_unlock(&state_test_data->lock); + + /* Make as second publish */ + uint16_t packet_id_2 = aws_mqtt_client_connection_publish( + state_test_data->mqtt_connection, + &pub_topic, + AWS_MQTT_QOS_AT_LEAST_ONCE, + false, + &payload_1, + s_on_op_complete, + state_test_data); + ASSERT_TRUE(packet_id_2 > 0); + s_wait_for_ops_completed(state_test_data); + /* Publish packet written at 2 seconds (relative to PING that was scheduled above) */ + + aws_thread_current_sleep(4250000000); /* Wait 4.25 (the extra 0.25 is to account for jitter) seconds */ + /** + * Note: The extra 2 seconds are to account for the time it takes to publish on the socket. Despite best efforts, + * I cannot get it to trigger right away in the test suite... + * If you read the logs though, the scheduled PINGs should be 4 seconds, 3 seconds, 2 seconds, 4 seconds + */ + + /* Ensure the server has gotten only 1 PING packet */ + ASSERT_INT_EQUALS(1, mqtt_mock_server_get_ping_count(state_test_data->mock_server)); + /** + * At this point a new PING task is scheduled for 4 seconds, but we do not care anymore for the + * purposes of this test. + */ + + /* Disconnect and finish! */ + ASSERT_SUCCESS( + aws_mqtt_client_connection_disconnect(state_test_data->mqtt_connection, s_on_disconnect_fn, state_test_data)); + s_wait_for_disconnect_to_complete(state_test_data); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE_FIXTURE( + mqtt_connection_ping_double_scenario, + s_setup_mqtt_server_fn, + s_test_mqtt_connection_ping_double_scenario_fn, + s_clean_up_mqtt_server_fn, + &test_data) diff --git a/tests/v3/mqtt_mock_server_handler.c b/tests/v3/mqtt_mock_server_handler.c index 313dafe3..13f7171f 100644 --- a/tests/v3/mqtt_mock_server_handler.c +++ b/tests/v3/mqtt_mock_server_handler.c @@ -28,6 +28,7 @@ struct mqtt_mock_server_handler { size_t ping_resp_avail; size_t pubacks_received; + size_t ping_received; size_t connacks_avail; bool auto_ack; @@ -111,6 +112,7 @@ static int s_mqtt_mock_server_handler_process_packet( size_t ping_resp_available = 0; aws_mutex_lock(&server->synced.lock); ping_resp_available = server->synced.ping_resp_avail > 0 ? server->synced.ping_resp_avail-- : 0; + server->synced.ping_received += 1; aws_mutex_unlock(&server->synced.lock); if (ping_resp_available) { @@ -725,3 +727,11 @@ int mqtt_mock_server_decode_packets(struct aws_channel_handler *handler) { aws_mutex_unlock(&server->synced.lock); return AWS_OP_SUCCESS; } + +size_t mqtt_mock_server_get_ping_count(struct aws_channel_handler *handler) { + struct mqtt_mock_server_handler *server = handler->impl; + aws_mutex_lock(&server->synced.lock); + size_t count = server->synced.ping_received; + aws_mutex_unlock(&server->synced.lock); + return count; +} diff --git a/tests/v3/mqtt_mock_server_handler.h b/tests/v3/mqtt_mock_server_handler.h index c668c088..6693e422 100644 --- a/tests/v3/mqtt_mock_server_handler.h +++ b/tests/v3/mqtt_mock_server_handler.h @@ -126,4 +126,9 @@ struct mqtt_decoded_packet *mqtt_mock_server_find_decoded_packet_by_type( */ int mqtt_mock_server_decode_packets(struct aws_channel_handler *handler); +/** + * Returns the number of PINGs the server has gotten + */ +size_t mqtt_mock_server_get_ping_count(struct aws_channel_handler *handler); + #endif /* MQTT_MOCK_SERVER_HANDLER_H */