Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Apr 12, 2024
1 parent 37f4b57 commit 8567ffb
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <aws/mqtt/mqtt.h>

struct aws_mqtt_request_response_client;
#include <aws/mqtt/request-response/request_response_client.h>

AWS_EXTERN_C_BEGIN

Expand Down
33 changes: 21 additions & 12 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ typedef void(
struct aws_rr_subscription_manager_options {

/*
* Maximum number of concurrent subscriptions allowed
* Maximum number of request-response subscriptions allowed. Must be at least two.
*/
size_t max_subscriptions;
size_t max_request_response_subscriptions;

/*
* Maximum number of streaming subscriptions allowed.
*/
size_t max_streaming_subscriptions;

/*
* Ack timeout to use for all subscribe and unsubscribe operations
Expand All @@ -104,8 +109,8 @@ struct aws_rr_subscription_manager_options {
};

/*
* The subscription manager works from a purely lazy perspective. Unsubscribes (from topic filters that are no longer
* referenced) occur when looking for new subscription space. Unsubscribe failures don't trigger anything special,
* The subscription manager works with the request-response client to handle subscriptions in an eager manner.
* Subscription purges are checked with every client service call. Unsubscribe failures don't trigger anything special,
* we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions
* that still need them, either in response to a new operation listener or a connection resumption event.
*
Expand Down Expand Up @@ -135,43 +140,47 @@ enum aws_rr_subscription_type {
};

struct aws_rr_acquire_subscription_options {
struct aws_byte_cursor topic_filter;
struct aws_byte_cursor *topic_filters;
size_t topic_filter_count;

uint64_t operation_id;
enum aws_rr_subscription_type type;
};

struct aws_rr_release_subscription_options {
struct aws_byte_cursor topic_filter;
struct aws_byte_cursor *topic_filters;
size_t topic_filter_count;

uint64_t operation_id;
};

enum aws_acquire_subscription_result_type {

/*
* The requested subscription already exists and is active. The operation can proceed to the next stage.
* All requested subscriptions already exist and are active. The operation can proceed to the next stage.
*/
AASRT_SUBSCRIBED,

/*
* The requested subscription now exists but is not yet active. The operation must wait for the subscribe
* The requested subscriptions now exist but at least one is not yet active. The operation must wait for subscribes
* to complete as success or failure.
*/
AASRT_SUBSCRIBING,

/*
* The subscription does not exist and there is no room for it currently. Room may open up in the future, so
* the operation should wait.
* At least one subscription does not exist and there is no room for it currently. Room may open up in the future,
* so the operation should wait.
*/
AASRT_BLOCKED,

/*
* The subscription does not exist, there is no room for it, and unless an event stream subscription gets
* At least one subscription does not exist and there is no room for it. Unless an event stream subscription gets
* closed, no room will be available in the future. The operation should be failed.
*/
AASRT_NO_CAPACITY,

/*
* An internal failure occurred while trying to establish the subscription. The operation should be failed.
* An internal failure occurred while trying to establish subscriptions. The operation should be failed.
*/
AASRT_FAILURE
};
Expand Down
21 changes: 5 additions & 16 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ typedef void(aws_mqtt_request_operation_completion_fn)(
void *user_data);

struct aws_mqtt_request_operation_options {
struct aws_byte_cursor subscription_topic_filter;
struct aws_byte_cursor *subscription_topic_filters;
size_t subscription_topic_filter_count;

struct aws_mqtt_request_operation_response_path *response_paths;
size_t response_path_count;
Expand All @@ -39,14 +40,6 @@ struct aws_mqtt_request_operation_options {
void *user_data;
};

struct aws_mqtt_request_operation_storage {
struct aws_mqtt_request_operation_options options;

struct aws_array_list operation_response_paths;

struct aws_byte_buf operation_data;
};

/*
* Describes a change to the state of a request operation subscription
*/
Expand Down Expand Up @@ -86,17 +79,13 @@ struct aws_mqtt_streaming_operation_options {
void *user_data;
};

struct aws_mqtt_streaming_operation_storage {
struct aws_mqtt_streaming_operation_options options;

struct aws_byte_buf operation_data;
};

typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);

struct aws_mqtt_request_response_client_options {
size_t max_subscriptions;
size_t max_request_response_subscriptions;
size_t max_streaming_subscriptions;

uint32_t operation_timeout_seconds;

/* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */
Expand Down
120 changes: 90 additions & 30 deletions source/request-response/request_response_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@
#define MQTT_RR_CLIENT_OPERATION_TABLE_DEFAULT_SIZE 50
#define MQTT_RR_CLIENT_RESPONSE_TABLE_DEFAULT_SIZE 50

struct aws_mqtt_request_operation_storage {
struct aws_mqtt_request_operation_options options;

struct aws_array_list operation_response_paths;
struct aws_array_list subscription_topic_filters;

struct aws_byte_buf operation_data;
};

struct aws_mqtt_streaming_operation_storage {
struct aws_mqtt_streaming_operation_options options;

struct aws_byte_buf operation_data;
};

enum aws_mqtt_request_response_operation_type {
AWS_MRROT_REQUEST,
AWS_MRROT_STREAMING,
Expand Down Expand Up @@ -1117,7 +1132,8 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie
const struct aws_mqtt_request_response_client_options *options,
struct aws_event_loop *loop) {
struct aws_rr_subscription_manager_options sm_options = {
.max_subscriptions = options->max_subscriptions,
.max_request_response_subscriptions = options->max_request_response_subscriptions,
.max_streaming_subscriptions = options->max_streaming_subscriptions,
.operation_timeout_seconds = options->operation_timeout_seconds,
};

Expand Down Expand Up @@ -1215,7 +1231,8 @@ static void s_aws_rr_client_init_subscription_manager(

struct aws_rr_subscription_manager_options subscription_manager_options = {
.operation_timeout_seconds = rr_client->config.operation_timeout_seconds,
.max_subscriptions = rr_client->config.max_subscriptions,
.max_request_response_subscriptions = rr_client->config.max_request_response_subscriptions,
.max_streaming_subscriptions = rr_client->config.max_streaming_subscriptions,
.subscription_status_callback = s_aws_rr_client_subscription_status_event_callback,
.userdata = rr_client,
};
Expand Down Expand Up @@ -1266,15 +1283,6 @@ static uint64_t s_mqtt_request_response_client_get_next_service_time(struct aws_
return UINT64_MAX;
}

static struct aws_byte_cursor s_aws_mqtt_rr_operation_get_subscription_topic_filter(
struct aws_mqtt_rr_client_operation *operation) {
if (operation->type == AWS_MRROT_REQUEST) {
return operation->storage.request_storage.options.subscription_topic_filter;
} else {
return operation->storage.streaming_storage.options.topic_filter;
}
}

/* TODO: add aws-c-common API? */
static bool s_is_operation_in_list(const struct aws_mqtt_rr_client_operation *operation) {
return aws_linked_list_node_prev_is_valid(&operation->node) && aws_linked_list_node_next_is_valid(&operation->node);
Expand All @@ -1284,7 +1292,7 @@ static int s_add_streaming_operation_to_subscription_topic_filter_table(
struct aws_mqtt_request_response_client *client,
struct aws_mqtt_rr_client_operation *operation) {

struct aws_byte_cursor topic_filter_cursor = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation);
struct aws_byte_cursor topic_filter_cursor = operation->storage.streaming_storage.options.topic_filter;

struct aws_hash_element *element = NULL;
if (aws_hash_table_find(&client->streaming_operation_subscription_lists, &topic_filter_cursor, &element)) {
Expand Down Expand Up @@ -1445,6 +1453,24 @@ static bool s_can_operation_dequeue(
return token_element == NULL;
}

static struct aws_byte_cursor *s_aws_mqtt_rr_operation_get_subscription_topic_filters(
struct aws_mqtt_rr_client_operation *operation) {
if (operation->type == AWS_MRROT_STREAMING) {
return &operation->storage.streaming_storage.options.topic_filter;
} else {
return operation->storage.request_storage.options.subscription_topic_filters;
}
}

static size_t s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(
struct aws_mqtt_rr_client_operation *operation) {
if (operation->type == AWS_MRROT_STREAMING) {
return 1;
} else {
return operation->storage.request_storage.options.subscription_topic_filter_count;
}
}

static void s_process_queued_operations(struct aws_mqtt_request_response_client *client) {
aws_rr_subscription_manager_purge_unused(&client->subscription_manager);

Expand All @@ -1458,7 +1484,8 @@ static void s_process_queued_operations(struct aws_mqtt_request_response_client
}

struct aws_rr_acquire_subscription_options subscribe_options = {
.topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(head_operation),
.topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(head_operation),
.topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(head_operation),
.operation_id = head_operation->id,
.type = s_rr_operation_type_to_subscription_type(head_operation->type),
};
Expand Down Expand Up @@ -1692,15 +1719,26 @@ static bool s_are_request_operation_options_valid(
return false;
}

if (!aws_mqtt_is_valid_topic_filter(&request_options->subscription_topic_filter)) {
if (request_options->subscription_topic_filter_count == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"(%p) rr client request options - " PRInSTR " is not a valid topic filter",
(void *)client,
AWS_BYTE_CURSOR_PRI(request_options->subscription_topic_filter));
"(%p) rr client request options - no subscription topic filters supplied",
(void *)client);
return false;
}

for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
const struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i];
if (!aws_mqtt_is_valid_topic_filter(&subscription_topic_filter)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE,
"(%p) rr client request options - " PRInSTR " is not a valid subscription topic filter",
(void *)client,
AWS_BYTE_CURSOR_PRI(subscription_topic_filter));
return false;
}
}

if (request_options->serialized_request.len == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_REQUEST_RESPONSE, "(%p) rr client request options - empty request payload", (void *)client);
Expand Down Expand Up @@ -1860,7 +1898,8 @@ static void s_mqtt_rr_client_destroy_operation(struct aws_task *task, void *arg,

if (client->state != AWS_RRCS_SHUTTING_DOWN) {
struct aws_rr_release_subscription_options release_options = {
.topic_filter = s_aws_mqtt_rr_operation_get_subscription_topic_filter(operation),
.topic_filters = s_aws_mqtt_rr_operation_get_subscription_topic_filters(operation),
.topic_filter_count = s_aws_mqtt_rr_operation_get_subscription_topic_filter_count(operation),
.operation_id = operation->id,
};
aws_rr_subscription_manager_release_subscription(&client->subscription_manager, &release_options);
Expand Down Expand Up @@ -1933,7 +1972,12 @@ void s_aws_mqtt_request_operation_storage_init_from_options(
bytes_needed += request_options->publish_topic.len;
bytes_needed += request_options->serialized_request.len;
bytes_needed += request_options->correlation_token.len;
bytes_needed += request_options->subscription_topic_filter.len;

for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
const struct aws_byte_cursor *subscription_topic_filter = &request_options->subscription_topic_filters[i];

bytes_needed += subscription_topic_filter->len;
}

for (size_t i = 0; i < request_options->response_path_count; ++i) {
const struct aws_mqtt_request_operation_response_path *response_path = &request_options->response_paths[i];
Expand All @@ -1950,6 +1994,11 @@ void s_aws_mqtt_request_operation_storage_init_from_options(
allocator,
request_options->response_path_count,
sizeof(struct aws_mqtt_request_operation_response_path));
aws_array_list_init_dynamic(
&storage->subscription_topic_filters,
allocator,
request_options->subscription_topic_filter_count,
sizeof(struct aws_byte_cursor));

AWS_FATAL_ASSERT(
aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.publish_topic) == AWS_OP_SUCCESS);
Expand All @@ -1959,9 +2008,15 @@ void s_aws_mqtt_request_operation_storage_init_from_options(
AWS_FATAL_ASSERT(
aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.correlation_token) ==
AWS_OP_SUCCESS);
AWS_FATAL_ASSERT(
aws_byte_buf_append_and_update(&storage->operation_data, &storage->options.subscription_topic_filter) ==
AWS_OP_SUCCESS);

for (size_t i = 0; i < request_options->subscription_topic_filter_count; ++i) {
struct aws_byte_cursor subscription_topic_filter = request_options->subscription_topic_filters[i];

AWS_FATAL_ASSERT(
aws_byte_buf_append_and_update(&storage->operation_data, &subscription_topic_filter) == AWS_OP_SUCCESS);

aws_array_list_push_back(&storage->subscription_topic_filters, &subscription_topic_filter);
}

for (size_t i = 0; i < request_options->response_path_count; ++i) {
struct aws_mqtt_request_operation_response_path response_path = request_options->response_paths[i];
Expand All @@ -1988,14 +2043,19 @@ static void s_log_request_response_operation(

struct aws_mqtt_request_operation_options *options = &operation->storage.request_storage.options;

AWS_LOGUF(
log_handle,
AWS_LL_DEBUG,
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client operation %" PRIu64 " - subscription topic filter: '" PRInSTR "'",
(void *)client,
operation->id,
AWS_BYTE_CURSOR_PRI(options->subscription_topic_filter));
for (size_t i = 0; i < options->subscription_topic_filter_count; ++i) {
struct aws_byte_cursor subscription_topic_filter = options->subscription_topic_filters[i];

AWS_LOGUF(
log_handle,
AWS_LL_DEBUG,
AWS_LS_MQTT_REQUEST_RESPONSE,
"id=%p: request-response client operation %" PRIu64 " - subscription topic filter %zu topic '" PRInSTR "'",
(void *)client,
operation->id,
i,
AWS_BYTE_CURSOR_PRI(subscription_topic_filter));
}

AWS_LOGUF(
log_handle,
Expand Down
Loading

0 comments on commit 8567ffb

Please sign in to comment.