From 8567ffbfc64b1feb218d3c659bb91e45757402a8 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 11 Apr 2024 19:40:08 -0700 Subject: [PATCH] Checkpoint --- .../request_response_client.h | 2 +- .../request-response/subscription_manager.h | 33 +- .../request_response_client.h | 21 +- .../request_response_client.c | 120 +++++-- .../request-response/subscription_manager.c | 219 +++++++----- .../subscription_manager_tests.c | 338 +++++++++++------- 6 files changed, 446 insertions(+), 287 deletions(-) diff --git a/include/aws/mqtt/private/request-response/request_response_client.h b/include/aws/mqtt/private/request-response/request_response_client.h index d0109f24..b64ae7fa 100644 --- a/include/aws/mqtt/private/request-response/request_response_client.h +++ b/include/aws/mqtt/private/request-response/request_response_client.h @@ -8,7 +8,7 @@ #include -struct aws_mqtt_request_response_client; +#include AWS_EXTERN_C_BEGIN diff --git a/include/aws/mqtt/private/request-response/subscription_manager.h b/include/aws/mqtt/private/request-response/subscription_manager.h index 072c580c..da70b4c9 100644 --- a/include/aws/mqtt/private/request-response/subscription_manager.h +++ b/include/aws/mqtt/private/request-response/subscription_manager.h @@ -90,9 +90,14 @@ typedef void( struct aws_rr_subscription_manager_options { /* - * Maximum number of concurrent subscriptions allowed + * Maximum number of request-response subscriptions allowed. Must be at least two. */ - size_t max_subscriptions; + size_t max_request_response_subscriptions; + + /* + * Maximum number of streaming subscriptions allowed. + */ + size_t max_streaming_subscriptions; /* * Ack timeout to use for all subscribe and unsubscribe operations @@ -104,8 +109,8 @@ struct aws_rr_subscription_manager_options { }; /* - * The subscription manager works from a purely lazy perspective. Unsubscribes (from topic filters that are no longer - * referenced) occur when looking for new subscription space. Unsubscribe failures don't trigger anything special, + * The subscription manager works with the request-response client to handle subscriptions in an eager manner. + * Subscription purges are checked with every client service call. Unsubscribe failures don't trigger anything special, * we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions * that still need them, either in response to a new operation listener or a connection resumption event. * @@ -135,43 +140,47 @@ enum aws_rr_subscription_type { }; struct aws_rr_acquire_subscription_options { - struct aws_byte_cursor topic_filter; + struct aws_byte_cursor *topic_filters; + size_t topic_filter_count; + uint64_t operation_id; enum aws_rr_subscription_type type; }; struct aws_rr_release_subscription_options { - struct aws_byte_cursor topic_filter; + struct aws_byte_cursor *topic_filters; + size_t topic_filter_count; + uint64_t operation_id; }; enum aws_acquire_subscription_result_type { /* - * The requested subscription already exists and is active. The operation can proceed to the next stage. + * All requested subscriptions already exist and are active. The operation can proceed to the next stage. */ AASRT_SUBSCRIBED, /* - * The requested subscription now exists but is not yet active. The operation must wait for the subscribe + * The requested subscriptions now exist but at least one is not yet active. The operation must wait for subscribes * to complete as success or failure. */ AASRT_SUBSCRIBING, /* - * The subscription does not exist and there is no room for it currently. Room may open up in the future, so - * the operation should wait. + * At least one subscription does not exist and there is no room for it currently. Room may open up in the future, + * so the operation should wait. */ AASRT_BLOCKED, /* - * The subscription does not exist, there is no room for it, and unless an event stream subscription gets + * At least one subscription does not exist and there is no room for it. Unless an event stream subscription gets * closed, no room will be available in the future. The operation should be failed. */ AASRT_NO_CAPACITY, /* - * An internal failure occurred while trying to establish the subscription. The operation should be failed. + * An internal failure occurred while trying to establish subscriptions. The operation should be failed. */ AASRT_FAILURE }; diff --git a/include/aws/mqtt/request-response/request_response_client.h b/include/aws/mqtt/request-response/request_response_client.h index 18b01843..1abd108a 100644 --- a/include/aws/mqtt/request-response/request_response_client.h +++ b/include/aws/mqtt/request-response/request_response_client.h @@ -26,7 +26,8 @@ typedef void(aws_mqtt_request_operation_completion_fn)( void *user_data); struct aws_mqtt_request_operation_options { - struct aws_byte_cursor subscription_topic_filter; + struct aws_byte_cursor *subscription_topic_filters; + size_t subscription_topic_filter_count; struct aws_mqtt_request_operation_response_path *response_paths; size_t response_path_count; @@ -39,14 +40,6 @@ struct aws_mqtt_request_operation_options { void *user_data; }; -struct aws_mqtt_request_operation_storage { - struct aws_mqtt_request_operation_options options; - - struct aws_array_list operation_response_paths; - - struct aws_byte_buf operation_data; -}; - /* * Describes a change to the state of a request operation subscription */ @@ -86,17 +79,13 @@ struct aws_mqtt_streaming_operation_options { void *user_data; }; -struct aws_mqtt_streaming_operation_storage { - struct aws_mqtt_streaming_operation_options options; - - struct aws_byte_buf operation_data; -}; - typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data); typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data); struct aws_mqtt_request_response_client_options { - size_t max_subscriptions; + size_t max_request_response_subscriptions; + size_t max_streaming_subscriptions; + uint32_t operation_timeout_seconds; /* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */ diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index dc13b273..c56f3d64 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -20,6 +20,21 @@ #define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50 #define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50 +struct aws_mqtt_request_operation_storage { + struct aws_mqtt_request_operation_options options; + + struct aws_array_list operation_response_paths; + struct aws_array_list subscription_topic_filters; + + struct aws_byte_buf operation_data; +}; + +struct aws_mqtt_streaming_operation_storage { + struct aws_mqtt_streaming_operation_options options; + + struct aws_byte_buf operation_data; +}; + enum aws_mqtt_request_response_operation_type { AWS_MRROT_REQUEST, AWS_MRROT_STREAMING, @@ -1117,7 +1132,8 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie const struct aws_mqtt_request_response_client_options *options, struct aws_event_loop *loop) { struct aws_rr_subscription_manager_options sm_options = { - .max_subscriptions = options->max_subscriptions, + .max_request_response_subscriptions = options->max_request_response_subscriptions, + .max_streaming_subscriptions = options->max_streaming_subscriptions, .operation_timeout_seconds = options->operation_timeout_seconds, }; @@ -1215,7 +1231,8 @@ static void s_aws_rr_client_init_subscription_manager( struct aws_rr_subscription_manager_options subscription_manager_options = { .operation_timeout_seconds = rr_client->config.operation_timeout_seconds, - .max_subscriptions = rr_client->config.max_subscriptions, + .max_request_response_subscriptions = rr_client->config.max_request_response_subscriptions, + .max_streaming_subscriptions = rr_client->config.max_streaming_subscriptions, .subscription_status_callback = s_aws_rr_client_subscription_status_event_callback, .userdata = rr_client, }; @@ -1266,15 +1283,6 @@ static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_ return UINT64_MAX; } -static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_filter( - struct aws_mqtt_rr_client_operation *operation) { - if (operation->type == AWS_MRROT_REQUEST) { - return operation->storage.request_storage.options.subscription_topic_filter; - } else { - return operation->storage.streaming_storage.options.topic_filter; - } -} - /* TODO: add aws-c-common API? */ static bool s_is_operation_in_list(const struct aws_mqtt_rr_client_operation *operation) { return aws_linked_list_node_prev_is_valid(&operation->node) && aws_linked_list_node_next_is_valid(&operation->node); @@ -1284,7 +1292,7 @@ static int s_add_streaming_operation_to_subscription_topic_filter_table( struct aws_mqtt_request_response_client *client, struct aws_mqtt_rr_client_operation *operation) { - struct aws_byte_cursor topic_filter_cursor = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation); + struct aws_byte_cursor topic_filter_cursor = operation->storage.streaming_storage.options.topic_filter; struct aws_hash_element *element = NULL; if (aws_hash_table_find(&client->streaming_operation_subscription_lists, &topic_filter_cursor, &element)) { @@ -1445,6 +1453,24 @@ static bool s_can_operation_dequeue( return token_element == NULL; } +static struct aws_byte_cursor *s_aws_mqtt_rr_operation_get_subscription_topic_filters( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_STREAMING) { + return &operation->storage.streaming_storage.options.topic_filter; + } else { + return operation->storage.request_storage.options.subscription_topic_filters; + } +} + +static size_t s_aws_mqtt_rr_operation_get_subscription_topic_filter_count( + struct aws_mqtt_rr_client_operation *operation) { + if (operation->type == AWS_MRROT_STREAMING) { + return 1; + } else { + return operation->storage.request_storage.options.subscription_topic_filter_count; + } +} + static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) { aws_rr_subscription_manager_purge_unused(&client->subscription_manager); @@ -1458,7 +1484,8 @@ static void s_process_queued_operations(struct aws_mqtt_request_response_client } struct aws_rr_acquire_subscription_options subscribe_options = { - .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation), + .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(head_operation), + .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(head_operation), .operation_id = head_operation->id, .type = s_rr_operation_type_to_subscription_type(head_operation->type), }; @@ -1692,15 +1719,26 @@ static bool s_are_request_operation_options_valid( return false; } - if (!aws_mqtt_is_valid_topic_filter(&request_options->subscription_topic_filter)) { + if (request_options->subscription_topic_filter_count == 0) { AWS_LOGF_ERROR( AWS_LS_MQTT_REQUEST_RESPONSE, - "(%p) rr client request options - " PRInSTR " is not a valid topic filter", - (void *)client, - AWS_BYTE_CURSOR_PRI(request_options->subscription_topic_filter)); + "(%p) rr client request options - no subscription topic filters supplied", + (void *)client); return false; } + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + const struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i]; + if (!aws_mqtt_is_valid_topic_filter(&subscription_topic_filter)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - " PRInSTR " is not a valid subscription topic filter", + (void *)client, + AWS_BYTE_CURSOR_PRI(subscription_topic_filter)); + return false; + } + } + if (request_options->serialized_request.len == 0) { AWS_LOGF_ERROR( AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client); @@ -1860,7 +1898,8 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg, if (client->state != AWS_RRCS_SHUTTING_DOWN) { struct aws_rr_release_subscription_options release_options = { - .topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation), + .topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(operation), + .topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(operation), .operation_id = operation->id, }; aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options); @@ -1933,7 +1972,12 @@ void s_aws_mqtt_request_operation_storage_init_from_options( bytes_needed += request_options->publish_topic.len; bytes_needed += request_options->serialized_request.len; bytes_needed += request_options->correlation_token.len; - bytes_needed += request_options->subscription_topic_filter.len; + + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + const struct aws_byte_cursor *subscription_topic_filter = &request_options->subscription_topic_filters[i]; + + bytes_needed += subscription_topic_filter->len; + } for (size_t i = 0; i < request_options->response_path_count; ++i) { const struct aws_mqtt_request_operation_response_path *response_path = &request_options->response_paths[i]; @@ -1950,6 +1994,11 @@ void s_aws_mqtt_request_operation_storage_init_from_options( allocator, request_options->response_path_count, sizeof(struct aws_mqtt_request_operation_response_path)); + aws_array_list_init_dynamic( + &storage->subscription_topic_filters, + allocator, + request_options->subscription_topic_filter_count, + sizeof(struct aws_byte_cursor)); AWS_FATAL_ASSERT( aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS); @@ -1959,9 +2008,15 @@ void s_aws_mqtt_request_operation_storage_init_from_options( AWS_FATAL_ASSERT( aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT( - aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.subscription_topic_filter) == - AWS_OP_SUCCESS); + + for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) { + struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i]; + + AWS_FATAL_ASSERT( + aws_byte_buf_append_and_update(&storage->operation_data, &subscription_topic_filter) == AWS_OP_SUCCESS); + + aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter); + } for (size_t i = 0; i < request_options->response_path_count; ++i) { struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i]; @@ -1988,14 +2043,19 @@ static void s_log_request_response_operation( struct aws_mqtt_request_operation_options *options = &operation->storage.request_storage.options; - AWS_LOGUF( - log_handle, - AWS_LL_DEBUG, - AWS_LS_MQTT_REQUEST_RESPONSE, - "id=%p: request-response client operation %" PRIu64 " - subscription topic filter: '" PRInSTR "'", - (void *)client, - operation->id, - AWS_BYTE_CURSOR_PRI(options->subscription_topic_filter)); + for (size_t i = 0; i < options->subscription_topic_filter_count; ++i) { + struct aws_byte_cursor subscription_topic_filter = options->subscription_topic_filters[i]; + + AWS_LOGUF( + log_handle, + AWS_LL_DEBUG, + AWS_LS_MQTT_REQUEST_RESPONSE, + "id=%p: request-response client operation %" PRIu64 " - subscription topic filter %zu topic '" PRInSTR "'", + (void *)client, + operation->id, + i, + AWS_BYTE_CURSOR_PRI(subscription_topic_filter)); + } AWS_LOGUF( log_handle, diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 6a0d2d16..0430cb66 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -76,15 +76,6 @@ struct aws_rr_subscription_record { bool poisoned; }; -static void s_aws_rr_subscription_record_log_invariant_violations(const struct aws_rr_subscription_record *record) { - if (record->status == ARRSST_SUBSCRIBED && record->pending_action == ARRSPAT_SUBSCRIBING) { - AWS_LOGF_ERROR( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - subscription ('" PRInSTR "') invalid state", - AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor)); - } -} - static void s_aws_rr_subscription_record_destroy(void *element) { struct aws_rr_subscription_record *record = element; @@ -96,11 +87,12 @@ static void s_aws_rr_subscription_record_destroy(void *element) { static struct aws_rr_subscription_record *s_aws_rr_subscription_new( struct aws_allocator *allocator, - const struct aws_rr_acquire_subscription_options *options) { + struct aws_byte_cursor topic_filter, + enum aws_rr_subscription_type type) { struct aws_rr_subscription_record *record = aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_subscription_record)); record->allocator = allocator; - aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, options->topic_filter); + aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter); record->topic_filter_cursor = aws_byte_cursor_from_buf(&record->topic_filter); aws_hash_table_init( @@ -115,7 +107,7 @@ static struct aws_rr_subscription_record *s_aws_rr_subscription_new( record->status = ARRSST_NOT_SUBSCRIBED; record->pending_action = ARRSPAT_NOTHING; - record->type = options->type; + record->type = type; return record; } @@ -451,114 +443,148 @@ static int s_rr_activate_idle_subscription( enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_subscription( struct aws_rr_subscription_manager *manager, const struct aws_rr_acquire_subscription_options *options) { - struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, options->topic_filter); - // is no subscription present? - if (existing_record == NULL) { - // is the budget used up? - struct aws_subscription_stats stats; - s_get_subscription_stats(manager, &stats); + if (options->topic_filter_count == 0) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } - bool space_for_subscription = - stats.event_stream_subscriptions + stats.request_response_subscriptions < manager->config.max_subscriptions; - if (options->type == ARRST_EVENT_STREAM) { - // event stream subscriptions cannot hog the entire subscription budget - space_for_subscription = - space_for_subscription && (stats.event_stream_subscriptions + 1 < manager->config.max_subscriptions); + /* + * Check for poisoned or mismatched records. This has precedence over the following unsubscribing check, + * and so we put them in separate loops + */ + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + if (existing_record != NULL) { + if (existing_record->poisoned) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR + "'), operation %" PRIu64 " failed - existing subscription is poisoned and has not been released", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_FAILURE; + } + } + + if (existing_record->type != options->type) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 + " failed - conflicts with subscription type of existing subscription", + AWS_BYTE_CURSOR_PRI(topic_filter), + options->operation_id); + return AASRT_FAILURE; } + } - if (!space_for_subscription) { - // could space eventually free up? - if (options->type == ARRST_REQUEST_RESPONSE || stats.request_response_subscriptions > 1 || - stats.unsubscribing_event_stream_subscriptions > 0) { + /* blocked if an existing record is unsubscribing; also compute how many subscriptions are needed */ + size_t subscriptions_needed = 0; + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); + if (existing_record != NULL) { + if (existing_record->pending_action == ARRSPAT_UNSUBSCRIBING) { AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, "request-response subscription manager - acquire subscription for ('" PRInSTR - "'), operation %" PRIu64 " blocked - no room currently", - AWS_BYTE_CURSOR_PRI(options->topic_filter), + "'), operation %" PRIu64 " blocked - existing subscription is unsubscribing", + AWS_BYTE_CURSOR_PRI(topic_filter), options->operation_id); return AASRT_BLOCKED; - } else { + } + } else { + ++subscriptions_needed; + } + } + + /* Check for space and fail or block as appropriate */ + if (subscriptions_needed > 0) { + /* how much of the budget are we using? */ + struct aws_subscription_stats stats; + s_get_subscription_stats(manager, &stats); + + if (options->type == ARRST_REQUEST_RESPONSE) { + if (subscriptions_needed > + manager->config.max_request_response_subscriptions - stats.request_response_subscriptions) { AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR - "'), operation %" PRIu64 " failed - no room", - AWS_BYTE_CURSOR_PRI(options->topic_filter), + "request-response subscription manager - acquire subscription for request operation %" PRIu64 + " blocked - no room currently", options->operation_id); - return AASRT_NO_CAPACITY; + return AASRT_BLOCKED; + } + } else { + /* + * Streaming subscriptions have more complicated space-checking logic. Under certain conditions, we may + * block rather than failing + */ + if (subscriptions_needed + stats.event_stream_subscriptions > manager->config.max_streaming_subscriptions) { + if (subscriptions_needed + stats.event_stream_subscriptions <= + manager->config.max_streaming_subscriptions + stats.unsubscribing_event_stream_subscriptions) { + /* If enough subscriptions are in the process of going away then wait in the blocked state */ + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for streaming operation %" PRIu64 + " blocked - no room currently", + options->operation_id); + return AASRT_BLOCKED; + } else { + /* Otherwise, there's no hope, fail */ + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " failed - no room", + options->operation_id); + return AASRT_NO_CAPACITY; + } } } - - // create-and-add subscription - existing_record = s_aws_rr_subscription_new(manager->allocator, options); - aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL); - } - - AWS_FATAL_ASSERT(existing_record != NULL); - if (existing_record->type != options->type) { - AWS_LOGF_ERROR( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " failed - conflicts with subscription type of existing subscription", - AWS_BYTE_CURSOR_PRI(options->topic_filter), - options->operation_id); - return AASRT_FAILURE; } - if (existing_record->poisoned) { - AWS_LOGF_ERROR( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " failed - existing subscription is poisoned and has not been released", - AWS_BYTE_CURSOR_PRI(options->topic_filter), - options->operation_id); - return AASRT_FAILURE; - } + bool is_fully_subscribed = true; + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); - s_aws_rr_subscription_record_log_invariant_violations(existing_record); + if (existing_record == NULL) { + existing_record = s_aws_rr_subscription_new(manager->allocator, topic_filter, options->type); + aws_hash_table_put(&manager->subscriptions, &existing_record->topic_filter_cursor, existing_record, NULL); + } - // for simplicity, we require unsubscribes to complete before re-subscribing - if (existing_record->pending_action == ARRSPAT_UNSUBSCRIBING) { - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " blocked - existing subscription is unsubscribing", - AWS_BYTE_CURSOR_PRI(options->topic_filter), - options->operation_id); - return AASRT_BLOCKED; + s_add_listener_to_subscription_record(existing_record, options->operation_id); + if (existing_record->status != ARRSST_SUBSCRIBED) { + is_fully_subscribed = false; + } } - // register the operation as a listener - s_add_listener_to_subscription_record(existing_record, options->operation_id); - if (existing_record->status == ARRSST_SUBSCRIBED) { + if (is_fully_subscribed) { AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " subscribed - existing subscription is active", - AWS_BYTE_CURSOR_PRI(options->topic_filter), + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " fully subscribed - all required subscriptions are active", options->operation_id); return AASRT_SUBSCRIBED; } - // do we need to send a subscribe? - if (s_rr_activate_idle_subscription(manager, existing_record)) { - // error code was already logged at the point of failure - AWS_LOGF_ERROR( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " failed - synchronous subscribe failure", - AWS_BYTE_CURSOR_PRI(options->topic_filter), - options->operation_id); - return AASRT_FAILURE; - } + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + struct aws_rr_subscription_record *existing_record = s_get_subscription_record(manager, topic_filter); - s_aws_rr_subscription_record_log_invariant_violations(existing_record); + if (s_rr_activate_idle_subscription(manager, existing_record)) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " failed - synchronous subscribe failure", + options->operation_id); + return AASRT_FAILURE; + } + } AWS_LOGF_DEBUG( AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 - " subscribing - waiting on existing subscription", - AWS_BYTE_CURSOR_PRI(options->topic_filter), + "request-response subscription manager - acquire subscription for operation %" PRIu64 + " subscribing - waiting on one or more subscribes to complete", options->operation_id); return AASRT_SUBSCRIBING; @@ -567,7 +593,10 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su void aws_rr_subscription_manager_release_subscription( struct aws_rr_subscription_manager *manager, const struct aws_rr_release_subscription_options *options) { - s_remove_listener_from_subscription_record(manager, options->topic_filter, options->operation_id); + for (size_t i = 0; i < options->topic_filter_count; ++i) { + struct aws_byte_cursor topic_filter = options->topic_filters[i]; + s_remove_listener_from_subscription_record(manager, topic_filter, options->operation_id); + } } static void s_handle_protocol_adapter_request_subscription_event( @@ -663,8 +692,6 @@ void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( } else { s_handle_protocol_adapter_streaming_subscription_event(manager, record, event); } - - s_aws_rr_subscription_record_log_invariant_violations(record); } static int s_rr_activate_idle_subscriptions_wrapper(void *context, struct aws_hash_element *elem) { @@ -673,8 +700,6 @@ static int s_rr_activate_idle_subscriptions_wrapper(void *context, struct aws_ha s_rr_activate_idle_subscription(manager, record); - s_aws_rr_subscription_record_log_invariant_violations(record); - return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } @@ -748,7 +773,7 @@ void aws_rr_subscription_manager_on_protocol_adapter_connection_event( } bool aws_rr_subscription_manager_are_options_valid(const struct aws_rr_subscription_manager_options *options) { - if (options == NULL || options->max_subscriptions < 1 || options->operation_timeout_seconds == 0) { + if (options == NULL || options->max_request_response_subscriptions < 2 || options->operation_timeout_seconds == 0) { return false; } @@ -771,7 +796,7 @@ void aws_rr_subscription_manager_init( aws_hash_table_init( &manager->subscriptions, allocator, - options->max_subscriptions, + options->max_request_response_subscriptions + options->max_streaming_subscriptions, aws_hash_byte_cursor_ptr, aws_mqtt_byte_cursor_hash_equality, NULL, diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index 4c23d7c0..099b1d1a 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -262,7 +262,8 @@ static void s_aws_rr_subscription_status_event_test_callback_fn( struct aws_subscription_manager_test_fixture_options { uint32_t operation_timeout_seconds; - size_t max_subscriptions; + size_t max_request_response_subscriptions; + size_t max_streaming_subscriptions; bool start_connected; const struct aws_mqtt_protocol_adapter_vtable *adapter_vtable; }; @@ -276,7 +277,8 @@ static int s_aws_subscription_manager_test_fixture_init( AWS_ZERO_STRUCT(*fixture); struct aws_subscription_manager_test_fixture_options default_options = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, .start_connected = true, }; @@ -293,7 +295,8 @@ static int s_aws_subscription_manager_test_fixture_init( &fixture->subscription_status_records, allocator, 10, sizeof(struct aws_subscription_status_record)); struct aws_rr_subscription_manager_options subscription_manager_options = { - .max_subscriptions = options->max_subscriptions, + .max_request_response_subscriptions = options->max_request_response_subscriptions, + .max_streaming_subscriptions = options->max_streaming_subscriptions, .operation_timeout_seconds = options->operation_timeout_seconds, .subscription_status_callback = s_aws_rr_subscription_status_event_test_callback_fn, .userdata = fixture}; @@ -391,6 +394,24 @@ static bool s_contains_subscription_event_sequential_records( return true; } +static char s_hello_world1[] = "hello/world1"; +static struct aws_byte_cursor s_hello_world1_cursor = { + .ptr = (uint8_t *)s_hello_world1, + .len = AWS_ARRAY_SIZE(s_hello_world1) - 1, +}; + +static char s_hello_world2[] = "hello/world2"; +static struct aws_byte_cursor s_hello_world2_cursor = { + .ptr = (uint8_t *)s_hello_world2, + .len = AWS_ARRAY_SIZE(s_hello_world2) - 1, +}; + +static char s_hello_world3[] = "hello/world3"; +static struct aws_byte_cursor s_hello_world3_cursor = { + .ptr = (uint8_t *)s_hello_world3, + .len = AWS_ARRAY_SIZE(s_hello_world3) - 1, +}; + /* * Verify: Acquiring a new subscription triggers a protocol client subscribe and returns SUBSCRIBING */ @@ -405,17 +426,17 @@ static int s_rrsm_acquire_subscribing_fn(struct aws_allocator *allocator, void * struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter_cursor = s_hello_world2_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world3"), + .topic_filter_cursor = s_hello_world3_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -424,7 +445,8 @@ static int s_rrsm_acquire_subscribing_fn(struct aws_allocator *allocator, void * struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); @@ -432,7 +454,8 @@ static int s_rrsm_acquire_subscribing_fn(struct aws_allocator *allocator, void * struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); @@ -440,7 +463,8 @@ static int s_rrsm_acquire_subscribing_fn(struct aws_allocator *allocator, void * struct aws_rr_acquire_subscription_options acquire3_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world3"), + .topic_filters = &s_hello_world3_cursor, + .topic_filter_count = 1, .operation_id = 3, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire3_options)); @@ -469,12 +493,12 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter_cursor = s_hello_world2_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -483,7 +507,8 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); @@ -491,7 +516,8 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); @@ -499,7 +525,8 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato struct aws_rr_acquire_subscription_options reacquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 3, }; ASSERT_INT_EQUALS( @@ -508,7 +535,8 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato struct aws_rr_acquire_subscription_options reacquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 4, }; ASSERT_INT_EQUALS( @@ -538,12 +566,12 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter_cursor = s_hello_world2_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -552,7 +580,8 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); @@ -560,14 +589,15 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); ASSERT_TRUE(s_api_records_equals(fixture.mock_protocol_adapter, 2, expected_subscribes)); struct aws_protocol_adapter_subscription_event successful_subscription1_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; @@ -576,18 +606,18 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_subscription_status_record expected_subscription_events[] = { { .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }, { .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter_cursor = s_hello_world2_cursor, .operation_id = 2, }}; ASSERT_TRUE(s_contains_subscription_event_sequential_records(&fixture, 1, expected_subscription_events)); struct aws_protocol_adapter_subscription_event successful_subscription2_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter = s_hello_world2_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; @@ -597,7 +627,8 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_rr_acquire_subscription_options reacquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 3, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBED, aws_rr_subscription_manager_acquire_subscription(manager, &reacquire1_options)); @@ -605,7 +636,8 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_rr_acquire_subscription_options reacquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 4, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBED, aws_rr_subscription_manager_acquire_subscription(manager, &reacquire2_options)); @@ -626,7 +658,8 @@ static int s_do_acquire_blocked_test( struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); @@ -634,7 +667,8 @@ static int s_do_acquire_blocked_test( // no room, but it could potentially free up in the future struct aws_rr_acquire_subscription_options acquire2_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); @@ -651,7 +685,8 @@ static int s_rrsm_acquire_blocked_rr_fn(struct aws_allocator *allocator, void *c aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 1, + .max_request_response_subscriptions = 1, + .max_streaming_subscriptions = 1, .operation_timeout_seconds = 30, }; struct aws_subscription_manager_test_fixture fixture; @@ -676,21 +711,13 @@ static int s_rrsm_acquire_blocked_eventstream_fn(struct aws_allocator *allocator aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 2, + .max_request_response_subscriptions = 1, + .max_streaming_subscriptions = 1, .operation_timeout_seconds = 30, }; struct aws_subscription_manager_test_fixture fixture; ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config)); - struct aws_rr_acquire_subscription_options acquire1_options = { - .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world3"), - .operation_id = 3, - }; - ASSERT_INT_EQUALS( - AASRT_SUBSCRIBING, - aws_rr_subscription_manager_acquire_subscription(&fixture.subscription_manager, &acquire1_options)); - ASSERT_SUCCESS(s_do_acquire_blocked_test(&fixture, ARRST_EVENT_STREAM)); s_aws_subscription_manager_test_fixture_clean_up(&fixture); @@ -706,7 +733,8 @@ static int s_do_acquire_no_capacity_test(struct aws_subscription_manager_test_fi struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_NO_CAPACITY, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -723,7 +751,8 @@ static int s_rrsm_acquire_no_capacity_max1_fn(struct aws_allocator *allocator, v aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 1, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 0, .operation_timeout_seconds = 30, }; struct aws_subscription_manager_test_fixture fixture; @@ -749,20 +778,22 @@ static int s_rrsm_acquire_no_capacity_too_many_event_stream_fn(struct aws_alloca aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 5, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 4, .operation_timeout_seconds = 30, }; struct aws_subscription_manager_test_fixture fixture; ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config)); - // N - 1 event stream subscriptions for (size_t i = 0; i < 4; ++i) { char topic_filter_buffer[256]; snprintf(topic_filter_buffer, AWS_ARRAY_SIZE(topic_filter_buffer), "hello/world/%d", (int)(i + 1)); + struct aws_byte_cursor topic_filter_cursor = aws_byte_cursor_from_c_str(topic_filter_buffer); struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str(topic_filter_buffer), + .topic_filters = &topic_filter_cursor, + .topic_filter_count = 1, .operation_id = i + 2, }; ASSERT_INT_EQUALS( @@ -794,7 +825,8 @@ static int s_rrsm_acquire_failure_mixed_subscription_types_fn(struct aws_allocat struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS( @@ -803,7 +835,8 @@ static int s_rrsm_acquire_failure_mixed_subscription_types_fn(struct aws_allocat struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS( @@ -819,8 +852,7 @@ static int s_rrsm_acquire_failure_mixed_subscription_types_fn(struct aws_allocat AWS_TEST_CASE(rrsm_acquire_failure_mixed_subscription_types, s_rrsm_acquire_failure_mixed_subscription_types_fn) /* - * Verify: Acquiring an existing, completed request subscription does not trigger a protocol client subscribe and - * returns SUBSCRIBED. Verify request and streaming subscription events are emitted. + * Verify: Acquiring a poisoned streaming subscription results in failure. */ static int s_rrsm_acquire_failure_poisoned_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -834,13 +866,14 @@ static int s_rrsm_acquire_failure_poisoned_fn(struct aws_allocator *allocator, v struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); struct aws_protocol_adapter_subscription_event unretryable_failure_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, .retryable = false, @@ -849,14 +882,15 @@ static int s_rrsm_acquire_failure_poisoned_fn(struct aws_allocator *allocator, v struct aws_subscription_status_record expected_subscription_events[] = {{ .type = ARRSET_STREAMING_SUBSCRIPTION_HALTED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }}; ASSERT_TRUE(s_contains_subscription_event_sequential_records(&fixture, 1, expected_subscription_events)); struct aws_rr_acquire_subscription_options reacquire1_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 3, }; ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &reacquire1_options)); @@ -885,20 +919,22 @@ static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocato struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); struct aws_protocol_adapter_subscription_event successful_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; @@ -908,12 +944,12 @@ static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocato struct aws_subscription_status_record expected_subscription_events[] = { { .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }, { .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 2, }}; ASSERT_TRUE(s_contains_subscription_event_records(&fixture, 2, expected_subscription_events)); @@ -921,14 +957,15 @@ static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocato // verify no unsubscribes struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); // release once, verify no unsubscribe struct aws_rr_release_subscription_options release1_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release1_options); @@ -936,7 +973,8 @@ static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocato // release second struct aws_rr_release_subscription_options release2_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; aws_rr_subscription_manager_release_subscription(manager, &release2_options); @@ -971,20 +1009,22 @@ static int s_rrsm_release_unsubscribes_streaming_fn(struct aws_allocator *alloca struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); struct aws_protocol_adapter_subscription_event successful_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; @@ -994,12 +1034,12 @@ static int s_rrsm_release_unsubscribes_streaming_fn(struct aws_allocator *alloca struct aws_subscription_status_record expected_subscription_events[] = { { .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }, { .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 2, }}; ASSERT_TRUE(s_contains_subscription_event_records(&fixture, 2, expected_subscription_events)); @@ -1007,14 +1047,15 @@ static int s_rrsm_release_unsubscribes_streaming_fn(struct aws_allocator *alloca // verify no unsubscribes struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); // release once, verify no unsubscribe struct aws_rr_release_subscription_options release1_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release1_options); @@ -1022,7 +1063,8 @@ static int s_rrsm_release_unsubscribes_streaming_fn(struct aws_allocator *alloca // release second struct aws_rr_release_subscription_options release2_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; aws_rr_subscription_manager_release_subscription(manager, &release2_options); @@ -1045,7 +1087,7 @@ static int s_rrsm_do_unsubscribe_test(struct aws_allocator *allocator, bool shou aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 1, + .max_request_response_subscriptions = 2, .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, .start_connected = true, }; @@ -1057,33 +1099,44 @@ static int s_rrsm_do_unsubscribe_test(struct aws_allocator *allocator, bool shou struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); - // budget of 1, so new acquires should be blocked struct aws_rr_acquire_subscription_options acquire2_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; - ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); + + // budget of 2, so new acquires should be blocked + struct aws_rr_acquire_subscription_options acquire3_options = { + .type = ARRST_REQUEST_RESPONSE, + .topic_filters = &s_hello_world3_cursor, + .topic_filter_count = 1, + .operation_id = 3, + }; + ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire3_options)); // complete the subscribe struct aws_protocol_adapter_subscription_event successful_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &successful_subscription_event); // still blocked - ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); + ASSERT_INT_EQUALS(AASRT_BLOCKED, aws_rr_subscription_manager_acquire_subscription(manager, &acquire3_options)); // release struct aws_rr_release_subscription_options release1_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release1_options); @@ -1092,14 +1145,14 @@ static int s_rrsm_do_unsubscribe_test(struct aws_allocator *allocator, bool shou aws_rr_subscription_manager_purge_unused(manager); struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_TRUE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); // complete the unsubscribe struct aws_protocol_adapter_subscription_event successful_unsubscribe_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_UNSUBSCRIBE, .error_code = should_succeed ? AWS_ERROR_SUCCESS : AWS_ERROR_MQTT5_UNSUBSCRIBE_OPTIONS_VALIDATION, }; @@ -1110,7 +1163,7 @@ static int s_rrsm_do_unsubscribe_test(struct aws_allocator *allocator, bool shou // a successful unsubscribe should clear space, a failed one should not ASSERT_INT_EQUALS( (should_succeed ? AASRT_SUBSCRIBING : AASRT_BLOCKED), - aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); + aws_rr_subscription_manager_acquire_subscription(manager, &acquire3_options)); s_aws_subscription_manager_test_fixture_clean_up(&fixture); aws_mqtt_library_clean_up(); @@ -1167,7 +1220,8 @@ static int s_rrsm_acquire_failure_subscribe_sync_failure_request_fn(struct aws_a failing_vtable.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe_fails_first_time; struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, .start_connected = true, .adapter_vtable = &failing_vtable, @@ -1180,7 +1234,8 @@ static int s_rrsm_acquire_failure_subscribe_sync_failure_request_fn(struct aws_a struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1208,7 +1263,8 @@ static int s_rrsm_acquire_failure_subscribe_sync_failure_streaming_fn(struct aws failing_vtable.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe_fails_first_time; struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, .start_connected = true, .adapter_vtable = &failing_vtable, @@ -1221,14 +1277,16 @@ static int s_rrsm_acquire_failure_subscribe_sync_failure_streaming_fn(struct aws struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); struct aws_rr_acquire_subscription_options acquire_options2 = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options2)); @@ -1258,14 +1316,15 @@ static int s_rrsm_acquire_request_subscribe_failure_event_fn(struct aws_allocato struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); // complete the subscribe with a failure struct aws_protocol_adapter_subscription_event failed_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, }; @@ -1274,7 +1333,7 @@ static int s_rrsm_acquire_request_subscribe_failure_event_fn(struct aws_allocato // verify subscribe failure event emission struct aws_subscription_status_record expected_subscription_event = { .type = ARRSET_REQUEST_SUBSCRIBE_FAILURE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; @@ -1305,14 +1364,15 @@ static int s_rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe_fn( struct aws_rr_acquire_subscription_options acquire1_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); // complete the subscribe with a retryable failure struct aws_protocol_adapter_subscription_event failed_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, .retryable = true, @@ -1322,12 +1382,12 @@ static int s_rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe_fn( struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -1370,7 +1430,8 @@ static int s_do_offline_acquire_online_test( aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, .start_connected = false, }; @@ -1382,7 +1443,8 @@ static int s_do_offline_acquire_online_test( struct aws_rr_acquire_subscription_options acquire_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1400,7 +1462,7 @@ static int s_do_offline_acquire_online_test( struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -1409,14 +1471,14 @@ static int s_do_offline_acquire_online_test( // complete the subscribe, verify a subscription success/failure event struct aws_protocol_adapter_subscription_event subscription_event = { .event_type = AWS_PASET_SUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = success ? AWS_ERROR_SUCCESS : AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { .type = s_compute_expected_subscription_event_offline_acquire_online(subscription_type, success), - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); @@ -1494,7 +1556,8 @@ static int s_do_offline_acquire_release_online_test( // acquire struct aws_rr_acquire_subscription_options acquire_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1504,7 +1567,8 @@ static int s_do_offline_acquire_release_online_test( // release while offline, nothing should happen struct aws_rr_release_subscription_options release_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release_options); @@ -1520,7 +1584,8 @@ static int s_do_offline_acquire_release_online_test( // trigger a different subscription, verify it's the only thing that has reached the protocol adapter struct aws_rr_acquire_subscription_options acquire2_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); @@ -1528,7 +1593,7 @@ static int s_do_offline_acquire_release_online_test( struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filter_cursor = s_hello_world2_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -1578,7 +1643,8 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( // acquire struct aws_rr_acquire_subscription_options acquire_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1586,7 +1652,7 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( // successfully complete subscription struct aws_protocol_adapter_subscription_event subscription_event = { .event_type = AWS_PASET_SUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); @@ -1594,7 +1660,7 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( struct aws_subscription_status_record expected_subscription_event = { .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIBE_SUCCESS : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); @@ -1607,7 +1673,8 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( // release struct aws_rr_release_subscription_options release_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release_options); @@ -1615,7 +1682,8 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( // acquire something different, normally that triggers an unsubscribe, but we're offline struct aws_rr_acquire_subscription_options acquire2_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .topic_filters = &s_hello_world2_cursor, + .topic_filter_count = 1, .operation_id = 2, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); @@ -1623,7 +1691,7 @@ static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( // verify no unsubscribe has been sent struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); @@ -1689,7 +1757,8 @@ static int s_do_rrsm_acquire_clean_up_test( // acquire struct aws_rr_acquire_subscription_options acquire_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1698,7 +1767,7 @@ static int s_do_rrsm_acquire_clean_up_test( if (complete_subscribe) { struct aws_protocol_adapter_subscription_event subscription_event = { .event_type = AWS_PASET_SUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); @@ -1706,7 +1775,7 @@ static int s_do_rrsm_acquire_clean_up_test( struct aws_subscription_status_record expected_subscription_event = { .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIBE_SUCCESS : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); @@ -1726,7 +1795,7 @@ static int s_do_rrsm_acquire_clean_up_test( // verify an unsubscribe was sent even though we are offline struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_TRUE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); @@ -1734,7 +1803,8 @@ static int s_do_rrsm_acquire_clean_up_test( // prevent the fixture cleanup from double freeing the subscription manager that was already cleaned up by // reinitializing the subscription manager struct aws_rr_subscription_manager_options subscription_manager_options = { - .max_subscriptions = 3, + .max_request_response_subscriptions = 2, + .max_streaming_subscriptions = 2, .operation_timeout_seconds = 5, .subscription_status_callback = s_aws_rr_subscription_status_event_test_callback_fn, .userdata = &fixture, @@ -1847,7 +1917,8 @@ static int s_rrsm_do_no_session_subscription_ended_test( // acquire struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1855,14 +1926,14 @@ static int s_rrsm_do_no_session_subscription_ended_test( // successfully complete subscription struct aws_protocol_adapter_subscription_event subscription_event = { .event_type = AWS_PASET_SUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { .type = ARRSET_REQUEST_SUBSCRIBE_SUCCESS, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); @@ -1870,14 +1941,15 @@ static int s_rrsm_do_no_session_subscription_ended_test( // release if appropriate if (offline_while_unsubscribing) { struct aws_rr_release_subscription_options release_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release_options); struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); @@ -1906,7 +1978,7 @@ static int s_rrsm_do_no_session_subscription_ended_test( if (!offline_while_unsubscribing) { struct aws_subscription_status_record expected_subscription_ended_event = { .type = ARRSET_REQUEST_SUBSCRIPTION_ENDED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_ended_event)); @@ -1915,7 +1987,8 @@ static int s_rrsm_do_no_session_subscription_ended_test( // if we were unsubscribing, verify reacquire is blocked and then complete the unsubscribe struct aws_rr_acquire_subscription_options reacquire_options = { .type = ARRST_REQUEST_RESPONSE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 3, }; @@ -1924,7 +1997,7 @@ static int s_rrsm_do_no_session_subscription_ended_test( struct aws_protocol_adapter_subscription_event unsubscribe_event = { .event_type = AWS_PASET_UNSUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &unsubscribe_event); @@ -1986,7 +2059,8 @@ static int s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn(struc // acquire struct aws_rr_acquire_subscription_options acquire_options = { .type = ARRST_EVENT_STREAM, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); @@ -1994,14 +2068,14 @@ static int s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn(struc // successfully complete subscription struct aws_protocol_adapter_subscription_event subscription_event = { .event_type = AWS_PASET_SUBSCRIBE, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .error_code = AWS_ERROR_SUCCESS, }; aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); @@ -2022,7 +2096,7 @@ static int s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn(struc // verify subscription lost on rejoin struct aws_subscription_status_record expected_subscription_ended_event = { .type = ARRSET_STREAMING_SUBSCRIPTION_LOST, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 1, }; ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_ended_event)); @@ -2031,12 +2105,12 @@ static int s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn(struc struct aws_protocol_adapter_api_record expected_subscribes[] = { { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, { .type = PAAT_SUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }, }; @@ -2064,13 +2138,14 @@ static int s_do_purge_test(struct aws_allocator *allocator, enum aws_rr_subscrip struct aws_rr_acquire_subscription_options acquire1_options = { .type = subscription_type, - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); struct aws_protocol_adapter_subscription_event successful_subscription_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_SUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, }; @@ -2079,7 +2154,7 @@ static int s_do_purge_test(struct aws_allocator *allocator, enum aws_rr_subscrip struct aws_subscription_status_record expected_empty_subscription_events[] = { { .type = ARRSET_SUBSCRIPTION_EMPTY, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 0, }, }; @@ -2087,7 +2162,7 @@ static int s_do_purge_test(struct aws_allocator *allocator, enum aws_rr_subscrip struct aws_subscription_status_record expected_unsubscribe_events[] = { { .type = ARRSET_UNSUBSCRIBE_COMPLETE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .operation_id = 0, }, }; @@ -2098,14 +2173,15 @@ static int s_do_purge_test(struct aws_allocator *allocator, enum aws_rr_subscrip // verify no unsubscribes struct aws_protocol_adapter_api_record expected_unsubscribe = { .type = PAAT_UNSUBSCRIBE, - .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter_cursor = s_hello_world1_cursor, .timeout = DEFAULT_SM_TEST_TIMEOUT, }; ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); // release once, verify no unsubscribe struct aws_rr_release_subscription_options release1_options = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filters = &s_hello_world1_cursor, + .topic_filter_count = 1, .operation_id = 1, }; aws_rr_subscription_manager_release_subscription(manager, &release1_options); @@ -2123,7 +2199,7 @@ static int s_do_purge_test(struct aws_allocator *allocator, enum aws_rr_subscrip // complete the unsubscribe struct aws_protocol_adapter_subscription_event successful_unsubscribe_event = { - .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .topic_filter = s_hello_world1_cursor, .event_type = AWS_PASET_UNSUBSCRIBE, .error_code = AWS_ERROR_SUCCESS, };