Skip to content

Commit

Permalink
Merge branch 'MultiSub' into Elastishadow
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose committed Apr 14, 2024
2 parents 468e73e + 9ab9fd6 commit dffcf96
Show file tree
Hide file tree
Showing 8 changed files with 1,403 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <aws/mqtt/mqtt.h>

struct aws_mqtt_request_response_client;
#include <aws/mqtt/request-response/request_response_client.h>

AWS_EXTERN_C_BEGIN

Expand Down
33 changes: 21 additions & 12 deletions include/aws/mqtt/private/request-response/subscription_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,14 @@ typedef void(
struct aws_rr_subscription_manager_options {

/*
* Maximum number of concurrent subscriptions allowed
* Maximum number of request-response subscriptions allowed. Must be at least two.
*/
size_t max_subscriptions;
size_t max_request_response_subscriptions;

/*
* Maximum number of streaming subscriptions allowed.
*/
size_t max_streaming_subscriptions;

/*
* Ack timeout to use for all subscribe and unsubscribe operations
Expand All @@ -104,8 +109,8 @@ struct aws_rr_subscription_manager_options {
};

/*
* The subscription manager works from a purely lazy perspective. Unsubscribes (from topic filters that are no longer
* referenced) occur when looking for new subscription space. Unsubscribe failures don't trigger anything special,
* The subscription manager works with the request-response client to handle subscriptions in an eager manner.
* Subscription purges are checked with every client service call. Unsubscribe failures don't trigger anything special,
* we'll just try again next time we look for subscription space. Subscribes are attempted on idle subscriptions
* that still need them, either in response to a new operation listener or a connection resumption event.
*
Expand Down Expand Up @@ -135,43 +140,47 @@ enum aws_rr_subscription_type {
};

struct aws_rr_acquire_subscription_options {
struct aws_byte_cursor topic_filter;
struct aws_byte_cursor *topic_filters;
size_t topic_filter_count;

uint64_t operation_id;
enum aws_rr_subscription_type type;
};

struct aws_rr_release_subscription_options {
struct aws_byte_cursor topic_filter;
struct aws_byte_cursor *topic_filters;
size_t topic_filter_count;

uint64_t operation_id;
};

enum aws_acquire_subscription_result_type {

/*
* The requested subscription already exists and is active. The operation can proceed to the next stage.
* All requested subscriptions already exist and are active. The operation can proceed to the next stage.
*/
AASRT_SUBSCRIBED,

/*
* The requested subscription now exists but is not yet active. The operation must wait for the subscribe
* The requested subscriptions now exist but at least one is not yet active. The operation must wait for subscribes
* to complete as success or failure.
*/
AASRT_SUBSCRIBING,

/*
* The subscription does not exist and there is no room for it currently. Room may open up in the future, so
* the operation should wait.
* At least one subscription does not exist and there is no room for it currently. Room may open up in the future,
* so the operation should wait.
*/
AASRT_BLOCKED,

/*
* The subscription does not exist, there is no room for it, and unless an event stream subscription gets
* At least one subscription does not exist and there is no room for it. Unless an event stream subscription gets
* closed, no room will be available in the future. The operation should be failed.
*/
AASRT_NO_CAPACITY,

/*
* An internal failure occurred while trying to establish the subscription. The operation should be failed.
* An internal failure occurred while trying to establish subscriptions. The operation should be failed.
*/
AASRT_FAILURE
};
Expand Down
21 changes: 5 additions & 16 deletions include/aws/mqtt/request-response/request_response_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ typedef void(aws_mqtt_request_operation_completion_fn)(
void *user_data);

struct aws_mqtt_request_operation_options {
struct aws_byte_cursor subscription_topic_filter;
struct aws_byte_cursor *subscription_topic_filters;
size_t subscription_topic_filter_count;

struct aws_mqtt_request_operation_response_path *response_paths;
size_t response_path_count;
Expand All @@ -39,14 +40,6 @@ struct aws_mqtt_request_operation_options {
void *user_data;
};

struct aws_mqtt_request_operation_storage {
struct aws_mqtt_request_operation_options options;

struct aws_array_list operation_response_paths;

struct aws_byte_buf operation_data;
};

/*
* Describes a change to the state of a request operation subscription
*/
Expand Down Expand Up @@ -86,17 +79,13 @@ struct aws_mqtt_streaming_operation_options {
void *user_data;
};

struct aws_mqtt_streaming_operation_storage {
struct aws_mqtt_streaming_operation_options options;

struct aws_byte_buf operation_data;
};

typedef void(aws_mqtt_request_response_client_initialized_callback_fn)(void *user_data);
typedef void(aws_mqtt_request_response_client_terminated_callback_fn)(void *user_data);

struct aws_mqtt_request_response_client_options {
size_t max_subscriptions;
size_t max_request_response_subscriptions;
size_t max_streaming_subscriptions;

uint32_t operation_timeout_seconds;

/* Do not bind the initialized callback; it exists mostly for tests and should not be exposed */
Expand Down
Loading

0 comments on commit dffcf96

Please sign in to comment.