From 53835c52ba6a086506bd42c441f2489dbde01a82 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Fri, 12 Apr 2024 14:23:20 -0700 Subject: [PATCH] Tests passing --- .../request_response_client.c | 3 + .../request-response/subscription_manager.c | 22 ++-- .../request_response_client_tests.c | 49 ++++++-- .../subscription_manager_tests.c | 111 +++++++++++++----- 4 files changed, 133 insertions(+), 52 deletions(-) diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index c56f3d64..b4361a5e 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -1825,6 +1825,7 @@ static void s_aws_mqtt_streaming_operation_storage_clean_up(struct aws_mqtt_stre static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_request_operation_storage *storage) { aws_array_list_clean_up(&storage->operation_response_paths); + aws_array_list_clean_up(&storage->subscription_topic_filters); aws_byte_buf_clean_up(&storage->operation_data); } @@ -2018,6 +2019,8 @@ void s_aws_mqtt_request_operation_storage_init_from_options( aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter); } + storage->options.subscription_topic_filters = storage->subscription_topic_filters.data; + for (size_t i = 0; i < request_options->response_path_count; ++i) { struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i]; diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 0430cb66..522f9738 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -455,16 +455,18 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su for (size_t i = 0; i < options->topic_filter_count; ++i) { struct aws_byte_cursor topic_filter = options->topic_filters[i]; struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); - if (existing_record != NULL) { - if (existing_record->poisoned) { - AWS_LOGF_ERROR( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR - "'), operation %" PRIu64 " failed - existing subscription is poisoned and has not been released", - AWS_BYTE_CURSOR_PRI(topic_filter), - options->operation_id); - return AASRT_FAILURE; - } + if (existing_record == NULL) { + continue; + } + + if (existing_record->poisoned) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 + " failed - existing subscription is poisoned and has not been released", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_FAILURE; } if (existing_record->type != options->type) { diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index c1fe00e8..c7c43b72 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -309,7 +309,7 @@ static struct aws_rr_client_fixture_streaming_record *s_rrc_fixture_add_streamin struct aws_rr_client_fixture_streaming_record *record = s_aws_rr_client_fixture_streaming_record_new(fixture->allocator, fixture, key); - aws_hash_table_put(&fixture->streaming_records, &record->record_key, record, NULL); + aws_hash_table_put(&fixture->streaming_records, &record->record_key_cursor, record, NULL); return record; } @@ -539,7 +539,8 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt5( } struct aws_mqtt_request_response_client_options client_options = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = 5, }; @@ -597,7 +598,8 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt311( aws_test311_setup_mqtt_server_fn(allocator, &fixture->client_test_fixture.mqtt311_test_fixture); struct aws_mqtt_request_response_client_options client_options = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = 5, }; @@ -707,6 +709,12 @@ static int s_rrc_mqtt311_create_destroy_fn(struct aws_allocator *allocator, void AWS_TEST_CASE(rrc_mqtt311_create_destroy, s_rrc_mqtt311_create_destroy_fn) +static char s_response_filter_wildcard[] = "response/filter/+"; +static struct aws_byte_cursor s_response_filter_wildcard_cursor = { + .ptr = (uint8_t *)s_response_filter_wildcard, + .len = AWS_ARRAY_SIZE(s_response_filter_wildcard) - 1, +}; + static int s_rrc_do_submit_request_operation_failure_test( struct aws_allocator *allocator, void (*request_mutator_fn)(struct aws_mqtt_request_operation_options *)) { @@ -735,7 +743,8 @@ static int s_rrc_do_submit_request_operation_failure_test( }, }; struct aws_mqtt_request_operation_options good_request = { - .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .subscription_topic_filters = &s_response_filter_wildcard_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = AWS_ARRAY_SIZE(response_paths), .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), @@ -801,8 +810,14 @@ AWS_TEST_CASE( rrc_submit_request_operation_failure_invalid_publish_topic, s_rrc_submit_request_operation_failure_invalid_publish_topic_fn) +static char s_bad_filter[] = "a/#/c"; +static struct aws_byte_cursor s_bad_filter_cursor = { + .ptr = (uint8_t *)s_bad_filter, + .len = AWS_ARRAY_SIZE(s_bad_filter) - 1, +}; + static void s_invalid_subscription_topic_filter_mutator(struct aws_mqtt_request_operation_options *request_options) { - request_options->subscription_topic_filter = aws_byte_cursor_from_c_str("a/#/c"); + request_options->subscription_topic_filters = &s_bad_filter_cursor; } static int s_rrc_submit_request_operation_failure_invalid_subscription_topic_filter_fn( @@ -936,7 +951,8 @@ static int s_rrc_submit_request_operation_failure_by_shutdown_fn(struct aws_allo }; struct aws_mqtt_request_operation_options request = { - .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .subscription_topic_filters = &s_response_filter_wildcard_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = AWS_ARRAY_SIZE(response_paths), .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), @@ -1047,7 +1063,8 @@ static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_alloc }; struct aws_mqtt_request_operation_options request = { - .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .subscription_topic_filters = &s_response_filter_wildcard_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = AWS_ARRAY_SIZE(response_paths), .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), @@ -1056,7 +1073,8 @@ static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_alloc }; struct aws_mqtt_request_response_client_options rr_client_options = { - .max_subscriptions = 2, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 1, .operation_timeout_seconds = 2, }; @@ -1143,7 +1161,8 @@ static int s_init_fixture_streaming_operation_success( }; struct aws_mqtt_request_response_client_options rr_client_options = { - .max_subscriptions = 2, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 1, .operation_timeout_seconds = 2, }; @@ -1981,6 +2000,8 @@ static int s_submit_request_operation_from_prefix( AWS_BYTE_CURSOR_PRI(prefix)); snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), PRInSTR "/get", AWS_BYTE_CURSOR_PRI(prefix)); + struct aws_byte_cursor subscription_topic_filter_cursor = aws_byte_cursor_from_c_str(subscription_topic_filter); + char correlation_token[128]; struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); @@ -2004,7 +2025,8 @@ static int s_submit_request_operation_from_prefix( s_rrc_fixture_add_request_record(fixture, record_key); struct aws_mqtt_request_operation_options request = { - .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_topic_filters = &subscription_topic_filter_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = AWS_ARRAY_SIZE(response_paths), .publish_topic = aws_byte_cursor_from_c_str(publish_topic), @@ -2345,7 +2367,8 @@ static int s_init_fixture_request_operation_success( }; struct aws_mqtt_request_response_client_options rr_client_options = { - .max_subscriptions = 2, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = 2, }; @@ -2391,6 +2414,7 @@ static int s_rrc_test_submit_test_request( char subscription_buffer[128]; snprintf(subscription_buffer, AWS_ARRAY_SIZE(subscription_buffer), "%s/+", topic_prefix); + struct aws_byte_cursor subscription_buffer_cursor = aws_byte_cursor_from_c_str(subscription_buffer); char publish_topic_buffer[128]; snprintf(publish_topic_buffer, AWS_ARRAY_SIZE(publish_topic_buffer), "%s/publish", topic_prefix); @@ -2419,7 +2443,8 @@ static int s_rrc_test_submit_test_request( snprintf(request_buffer + used_bytes, AWS_ARRAY_SIZE(request_buffer) - used_bytes, "}"); struct aws_mqtt_request_operation_options request = { - .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_buffer), + .subscription_topic_filters = &subscription_buffer_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = AWS_ARRAY_SIZE(response_paths), .publish_topic = aws_byte_cursor_from_c_str(publish_topic_buffer), diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index 099b1d1a..939d0d5e 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -651,31 +651,6 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator AWS_TEST_CASE(rrsm_acquire_existing_subscribed, s_rrsm_acquire_existing_subscribed_fn) -static int s_do_acquire_blocked_test( - struct aws_subscription_manager_test_fixture *fixture, - enum aws_rr_subscription_type subscription_type) { - struct aws_rr_subscription_manager *manager = &fixture->subscription_manager; - - struct aws_rr_acquire_subscription_options acquire1_options = { - .type = ARRST_REQUEST_RESPONSE, - .topic_filters = &s_hello_world1_cursor, - .topic_filter_count = 1, - .operation_id = 1, - }; - ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); - - // no room, but it could potentially free up in the future - struct aws_rr_acquire_subscription_options acquire2_options = { - .type = subscription_type, - .topic_filters = &s_hello_world2_cursor, - .topic_filter_count = 1, - .operation_id = 2, - }; - ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); - - return AWS_OP_SUCCESS; -} - /* * Verify: Acquiring a new request-response subscription when there is no room returns BLOCKED */ @@ -685,14 +660,43 @@ static int s_rrsm_acquire_blocked_rr_fn(struct aws_allocator *allocator, void *c aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_request_response_subscriptions = 1, + .max_request_response_subscriptions = 2, .max_streaming_subscriptions = 1, .operation_timeout_seconds = 30, }; struct aws_subscription_manager_test_fixture fixture; ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config)); - ASSERT_SUCCESS(s_do_acquire_blocked_test(&fixture, ARRST_REQUEST_RESPONSE)); + struct aws_rr_acquire_subscription_options acquire1_options = { + .type = ARRST_REQUEST_RESPONSE, + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, + .operation_id = 1, + }; + ASSERT_INT_EQUALS( + AASRT_SUBSCRIBING, + aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire1_options)); + + struct aws_rr_acquire_subscription_options acquire2_options = { + .type = ARRST_REQUEST_RESPONSE, + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, + .operation_id = 2, + }; + ASSERT_INT_EQUALS( + AASRT_SUBSCRIBING, + aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire2_options)); + + // no room, but it could potentially free up in the future + struct aws_rr_acquire_subscription_options acquire3_options = { + .type = ARRST_REQUEST_RESPONSE, + .topic_filters = &s_hello_world3_cursor, + .topic_filter_count = 1, + .operation_id = 3, + }; + ASSERT_INT_EQUALS( + AASRT_BLOCKED, + aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire3_options)); s_aws_subscription_manager_test_fixture_clean_up(&fixture); aws_mqtt_library_clean_up(); @@ -711,14 +715,61 @@ static int s_rrsm_acquire_blocked_eventstream_fn(struct aws_allocator *allocator aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_request_response_subscriptions = 1, + .max_request_response_subscriptions = 2, .max_streaming_subscriptions = 1, - .operation_timeout_seconds = 30, + .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, }; struct aws_subscription_manager_test_fixture fixture; ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config)); - ASSERT_SUCCESS(s_do_acquire_blocked_test(&fixture, ARRST_EVENT_STREAM)); + struct aws_protocol_adapter_connection_event connected_event = { + .event_type = AWS_PACET_CONNECTED, + }; + aws_rr_subscription_manager_on_protocol_adapter_connection_event(&fixture.subscription_manager, &connected_event); + + struct aws_rr_acquire_subscription_options acquire1_options = { + .type = ARRST_EVENT_STREAM, + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, + .operation_id = 1, + }; + ASSERT_INT_EQUALS( + AASRT_SUBSCRIBING, + aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire1_options)); + + struct aws_protocol_adapter_subscription_event subscribe_success_event = { + .event_type = AWS_PASET_SUBSCRIBE, + .topic_filter = s_hello_world1_cursor, + }; + aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + &fixture.subscription_manager, &subscribe_success_event); + + // release and trigger an unsubscribe + struct aws_rr_release_subscription_options release1_options = { + .operation_id = 1, + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, + }; + aws_rr_subscription_manager_release_subscription(&fixture.subscription_manager, &release1_options); + aws_rr_subscription_manager_purge_unused(&fixture.subscription_manager); + + struct aws_protocol_adapter_api_record expected_unsubscribe = { + .type = PAAT_UNSUBSCRIBE, + .topic_filter_cursor = s_hello_world1_cursor, + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }; + ASSERT_TRUE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); + + // acquire while the streaming unsubscribe is in-progress should return blocked + struct aws_rr_acquire_subscription_options acquire2_options = { + .type = ARRST_EVENT_STREAM, + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, + .operation_id = 2, + }; + ASSERT_INT_EQUALS( + AASRT_BLOCKED, + aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire2_options)); s_aws_subscription_manager_test_fixture_clean_up(&fixture); aws_mqtt_library_clean_up();