Skip to content

Commit

Permalink
Tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Apr 12, 2024
1 parent 8567ffb commit 53835c5
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 52 deletions.
3 changes: 3 additions & 0 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1825,6 +1825,7 @@ static void s_aws_mqtt_streaming_operation_storage_clean_up(struct aws_mqtt_stre

static void s_aws_mqtt_request_operation_storage_clean_up(struct aws_mqtt_request_operation_storage *storage) {
aws_array_list_clean_up(&storage->operation_response_paths);
aws_array_list_clean_up(&storage->subscription_topic_filters);
aws_byte_buf_clean_up(&storage->operation_data);
}

Expand Down Expand Up @@ -2018,6 +2019,8 @@ void s_aws_mqtt_request_operation_storage_init_from_options(
aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter);
}

storage->options.subscription_topic_filters = storage->subscription_topic_filters.data;

for (size_t i = 0; i < request_options->response_path_count; ++i) {
struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i];

Expand Down
22 changes: 12 additions & 10 deletions source/request-response/subscription_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,18 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su
for (size_t i = 0; i < options->topic_filter_count; ++i) {
struct aws_byte_cursor topic_filter = options->topic_filters[i];
struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter);
if (existing_record != NULL) {
if (existing_record->poisoned) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response subscription manager - acquire subscription for ('" PRInSTR
"'), operation %" PRIu64 " failed - existing subscription is poisoned and has not been released",
AWS_BYTE_CURSOR_PRI(topic_filter),
options->operation_id);
return AASRT_FAILURE;
}
if (existing_record == NULL) {
continue;
}

if (existing_record->poisoned) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64
" failed - existing subscription is poisoned and has not been released",
AWS_BYTE_CURSOR_PRI(topic_filter),
options->operation_id);
return AASRT_FAILURE;
}

if (existing_record->type != options->type) {
Expand Down
49 changes: 37 additions & 12 deletions tests/request-response/request_response_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ static struct aws_rr_client_fixture_streaming_record *s_rrc_fixture_add_streamin
struct aws_rr_client_fixture_streaming_record *record =
s_aws_rr_client_fixture_streaming_record_new(fixture->allocator, fixture, key);

aws_hash_table_put(&fixture->streaming_records, &record->record_key, record, NULL);
aws_hash_table_put(&fixture->streaming_records, &record->record_key_cursor, record, NULL);

return record;
}
Expand Down Expand Up @@ -539,7 +539,8 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt5(
}

struct aws_mqtt_request_response_client_options client_options = {
.max_subscriptions = 3,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 2,
.operation_timeout_seconds = 5,
};

Expand Down Expand Up @@ -597,7 +598,8 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt311(
aws_test311_setup_mqtt_server_fn(allocator, &fixture->client_test_fixture.mqtt311_test_fixture);

struct aws_mqtt_request_response_client_options client_options = {
.max_subscriptions = 3,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 2,
.operation_timeout_seconds = 5,
};

Expand Down Expand Up @@ -707,6 +709,12 @@ static int s_rrc_mqtt311_create_destroy_fn(struct aws_allocator *allocator, void

AWS_TEST_CASE(rrc_mqtt311_create_destroy, s_rrc_mqtt311_create_destroy_fn)

static char s_response_filter_wildcard[] = "response/filter/+";
static struct aws_byte_cursor s_response_filter_wildcard_cursor = {
.ptr = (uint8_t *)s_response_filter_wildcard,
.len = AWS_ARRAY_SIZE(s_response_filter_wildcard) - 1,
};

static int s_rrc_do_submit_request_operation_failure_test(
struct aws_allocator *allocator,
void (*request_mutator_fn)(struct aws_mqtt_request_operation_options *)) {
Expand Down Expand Up @@ -735,7 +743,8 @@ static int s_rrc_do_submit_request_operation_failure_test(
},
};
struct aws_mqtt_request_operation_options good_request = {
.subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"),
.subscription_topic_filters = &s_response_filter_wildcard_cursor,
.subscription_topic_filter_count = 1,
.response_paths = response_paths,
.response_path_count = AWS_ARRAY_SIZE(response_paths),
.publish_topic = aws_byte_cursor_from_c_str("get/shadow"),
Expand Down Expand Up @@ -801,8 +810,14 @@ AWS_TEST_CASE(
rrc_submit_request_operation_failure_invalid_publish_topic,
s_rrc_submit_request_operation_failure_invalid_publish_topic_fn)

static char s_bad_filter[] = "a/#/c";
static struct aws_byte_cursor s_bad_filter_cursor = {
.ptr = (uint8_t *)s_bad_filter,
.len = AWS_ARRAY_SIZE(s_bad_filter) - 1,
};

static void s_invalid_subscription_topic_filter_mutator(struct aws_mqtt_request_operation_options *request_options) {
request_options->subscription_topic_filter = aws_byte_cursor_from_c_str("a/#/c");
request_options->subscription_topic_filters = &s_bad_filter_cursor;
}

static int s_rrc_submit_request_operation_failure_invalid_subscription_topic_filter_fn(
Expand Down Expand Up @@ -936,7 +951,8 @@ static int s_rrc_submit_request_operation_failure_by_shutdown_fn(struct aws_allo
};

struct aws_mqtt_request_operation_options request = {
.subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"),
.subscription_topic_filters = &s_response_filter_wildcard_cursor,
.subscription_topic_filter_count = 1,
.response_paths = response_paths,
.response_path_count = AWS_ARRAY_SIZE(response_paths),
.publish_topic = aws_byte_cursor_from_c_str("get/shadow"),
Expand Down Expand Up @@ -1047,7 +1063,8 @@ static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_alloc
};

struct aws_mqtt_request_operation_options request = {
.subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"),
.subscription_topic_filters = &s_response_filter_wildcard_cursor,
.subscription_topic_filter_count = 1,
.response_paths = response_paths,
.response_path_count = AWS_ARRAY_SIZE(response_paths),
.publish_topic = aws_byte_cursor_from_c_str("get/shadow"),
Expand All @@ -1056,7 +1073,8 @@ static int s_rrc_submit_request_operation_failure_by_timeout_fn(struct aws_alloc
};

struct aws_mqtt_request_response_client_options rr_client_options = {
.max_subscriptions = 2,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 1,
.operation_timeout_seconds = 2,
};

Expand Down Expand Up @@ -1143,7 +1161,8 @@ static int s_init_fixture_streaming_operation_success(
};

struct aws_mqtt_request_response_client_options rr_client_options = {
.max_subscriptions = 2,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 1,
.operation_timeout_seconds = 2,
};

Expand Down Expand Up @@ -1981,6 +2000,8 @@ static int s_submit_request_operation_from_prefix(
AWS_BYTE_CURSOR_PRI(prefix));
snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), PRInSTR "/get", AWS_BYTE_CURSOR_PRI(prefix));

struct aws_byte_cursor subscription_topic_filter_cursor = aws_byte_cursor_from_c_str(subscription_topic_filter);

char correlation_token[128];
struct aws_byte_buf correlation_token_buf =
aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token));
Expand All @@ -2004,7 +2025,8 @@ static int s_submit_request_operation_from_prefix(
s_rrc_fixture_add_request_record(fixture, record_key);

struct aws_mqtt_request_operation_options request = {
.subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter),
.subscription_topic_filters = &subscription_topic_filter_cursor,
.subscription_topic_filter_count = 1,
.response_paths = response_paths,
.response_path_count = AWS_ARRAY_SIZE(response_paths),
.publish_topic = aws_byte_cursor_from_c_str(publish_topic),
Expand Down Expand Up @@ -2345,7 +2367,8 @@ static int s_init_fixture_request_operation_success(
};

struct aws_mqtt_request_response_client_options rr_client_options = {
.max_subscriptions = 2,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 2,
.operation_timeout_seconds = 2,
};

Expand Down Expand Up @@ -2391,6 +2414,7 @@ static int s_rrc_test_submit_test_request(

char subscription_buffer[128];
snprintf(subscription_buffer, AWS_ARRAY_SIZE(subscription_buffer), "%s/+", topic_prefix);
struct aws_byte_cursor subscription_buffer_cursor = aws_byte_cursor_from_c_str(subscription_buffer);

char publish_topic_buffer[128];
snprintf(publish_topic_buffer, AWS_ARRAY_SIZE(publish_topic_buffer), "%s/publish", topic_prefix);
Expand Down Expand Up @@ -2419,7 +2443,8 @@ static int s_rrc_test_submit_test_request(
snprintf(request_buffer + used_bytes, AWS_ARRAY_SIZE(request_buffer) - used_bytes, "}");

struct aws_mqtt_request_operation_options request = {
.subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_buffer),
.subscription_topic_filters = &subscription_buffer_cursor,
.subscription_topic_filter_count = 1,
.response_paths = response_paths,
.response_path_count = AWS_ARRAY_SIZE(response_paths),
.publish_topic = aws_byte_cursor_from_c_str(publish_topic_buffer),
Expand Down
111 changes: 81 additions & 30 deletions tests/request-response/subscription_manager_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -651,31 +651,6 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator

AWS_TEST_CASE(rrsm_acquire_existing_subscribed, s_rrsm_acquire_existing_subscribed_fn)

static int s_do_acquire_blocked_test(
struct aws_subscription_manager_test_fixture *fixture,
enum aws_rr_subscription_type subscription_type) {
struct aws_rr_subscription_manager *manager = &fixture->subscription_manager;

struct aws_rr_acquire_subscription_options acquire1_options = {
.type = ARRST_REQUEST_RESPONSE,
.topic_filters = &s_hello_world1_cursor,
.topic_filter_count = 1,
.operation_id = 1,
};
ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options));

// no room, but it could potentially free up in the future
struct aws_rr_acquire_subscription_options acquire2_options = {
.type = subscription_type,
.topic_filters = &s_hello_world2_cursor,
.topic_filter_count = 1,
.operation_id = 2,
};
ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options));

return AWS_OP_SUCCESS;
}

/*
* Verify: Acquiring a new request-response subscription when there is no room returns BLOCKED
*/
Expand All @@ -685,14 +660,43 @@ static int s_rrsm_acquire_blocked_rr_fn(struct aws_allocator *allocator, void *c
aws_mqtt_library_init(allocator);

struct aws_subscription_manager_test_fixture_options fixture_config = {
.max_request_response_subscriptions = 1,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 1,
.operation_timeout_seconds = 30,
};
struct aws_subscription_manager_test_fixture fixture;
ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config));

ASSERT_SUCCESS(s_do_acquire_blocked_test(&fixture, ARRST_REQUEST_RESPONSE));
struct aws_rr_acquire_subscription_options acquire1_options = {
.type = ARRST_REQUEST_RESPONSE,
.topic_filters = &s_hello_world1_cursor,
.topic_filter_count = 1,
.operation_id = 1,
};
ASSERT_INT_EQUALS(
AASRT_SUBSCRIBING,
aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire1_options));

struct aws_rr_acquire_subscription_options acquire2_options = {
.type = ARRST_REQUEST_RESPONSE,
.topic_filters = &s_hello_world2_cursor,
.topic_filter_count = 1,
.operation_id = 2,
};
ASSERT_INT_EQUALS(
AASRT_SUBSCRIBING,
aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire2_options));

// no room, but it could potentially free up in the future
struct aws_rr_acquire_subscription_options acquire3_options = {
.type = ARRST_REQUEST_RESPONSE,
.topic_filters = &s_hello_world3_cursor,
.topic_filter_count = 1,
.operation_id = 3,
};
ASSERT_INT_EQUALS(
AASRT_BLOCKED,
aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire3_options));

s_aws_subscription_manager_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();
Expand All @@ -711,14 +715,61 @@ static int s_rrsm_acquire_blocked_eventstream_fn(struct aws_allocator *allocator
aws_mqtt_library_init(allocator);

struct aws_subscription_manager_test_fixture_options fixture_config = {
.max_request_response_subscriptions = 1,
.max_request_response_subscriptions = 2,
.max_streaming_subscriptions = 1,
.operation_timeout_seconds = 30,
.operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT,
};
struct aws_subscription_manager_test_fixture fixture;
ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config));

ASSERT_SUCCESS(s_do_acquire_blocked_test(&fixture, ARRST_EVENT_STREAM));
struct aws_protocol_adapter_connection_event connected_event = {
.event_type = AWS_PACET_CONNECTED,
};
aws_rr_subscription_manager_on_protocol_adapter_connection_event(&fixture.subscription_manager, &connected_event);

struct aws_rr_acquire_subscription_options acquire1_options = {
.type = ARRST_EVENT_STREAM,
.topic_filters = &s_hello_world1_cursor,
.topic_filter_count = 1,
.operation_id = 1,
};
ASSERT_INT_EQUALS(
AASRT_SUBSCRIBING,
aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire1_options));

struct aws_protocol_adapter_subscription_event subscribe_success_event = {
.event_type = AWS_PASET_SUBSCRIBE,
.topic_filter = s_hello_world1_cursor,
};
aws_rr_subscription_manager_on_protocol_adapter_subscription_event(
&fixture.subscription_manager, &subscribe_success_event);

// release and trigger an unsubscribe
struct aws_rr_release_subscription_options release1_options = {
.operation_id = 1,
.topic_filters = &s_hello_world1_cursor,
.topic_filter_count = 1,
};
aws_rr_subscription_manager_release_subscription(&fixture.subscription_manager, &release1_options);
aws_rr_subscription_manager_purge_unused(&fixture.subscription_manager);

struct aws_protocol_adapter_api_record expected_unsubscribe = {
.type = PAAT_UNSUBSCRIBE,
.topic_filter_cursor = s_hello_world1_cursor,
.timeout = DEFAULT_SM_TEST_TIMEOUT,
};
ASSERT_TRUE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe));

// acquire while the streaming unsubscribe is in-progress should return blocked
struct aws_rr_acquire_subscription_options acquire2_options = {
.type = ARRST_EVENT_STREAM,
.topic_filters = &s_hello_world2_cursor,
.topic_filter_count = 1,
.operation_id = 2,
};
ASSERT_INT_EQUALS(
AASRT_BLOCKED,
aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire2_options));

s_aws_subscription_manager_test_fixture_clean_up(&fixture);
aws_mqtt_library_clean_up();
Expand Down

0 comments on commit 53835c5

Please sign in to comment.