From 84d9696a538348f1db7b8ce6c6ba2b808041a080 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Sun, 14 Apr 2024 15:26:00 -0700 Subject: [PATCH] Checkpoint --- bin/elastishadow/main.c | 684 +++++++++++++++--- .../private/{shared_constants.h => shared.h} | 5 + source/client.c | 2 +- source/{shared_constants.c => shared.c} | 2 +- source/v5/mqtt5_client.c | 2 +- 5 files changed, 605 insertions(+), 90 deletions(-) rename include/aws/mqtt/private/{shared_constants.h => shared.h} (69%) rename source/{shared_constants.c => shared.c} (95%) diff --git a/bin/elastishadow/main.c b/bin/elastishadow/main.c index 08639bf5..729e4133 100644 --- a/bin/elastishadow/main.c +++ b/bin/elastishadow/main.c @@ -22,8 +22,9 @@ #include #include +#include #include -#include +#include #include #include @@ -57,8 +58,66 @@ struct app_ctx { struct aws_mqtt5_client *client; struct aws_mqtt_request_response_client *rr_client; + + /* + * &aws_shadow_streaming_operation.id -> aws_shadow_streaming_operation * + */ + struct aws_hash_table streaming_operations; + uint64_t next_id; }; +struct aws_shadow_streaming_operation { + struct aws_allocator *allocator; + + struct aws_byte_buf thing; + struct aws_byte_buf shadow; + struct aws_byte_buf topic_filter; + + uint64_t id; + struct aws_mqtt_rr_client_operation *streaming_operation; +}; + +struct aws_shadow_streaming_operation *s_aws_shadow_streaming_operation_new( + struct aws_allocator *allocator, + struct app_ctx *app_ctx, + struct aws_byte_cursor thing, + struct aws_byte_cursor shadow, + struct aws_byte_cursor topic_filter) { + struct aws_shadow_streaming_operation *operation = + aws_mem_calloc(allocator, 1, sizeof(struct aws_shadow_streaming_operation)); + + operation->allocator = allocator; + operation->id = ++app_ctx->next_id; + operation->streaming_operation = NULL; + + aws_byte_buf_init_copy_from_cursor(&operation->thing, allocator, thing); + aws_byte_buf_init_copy_from_cursor(&operation->shadow, allocator, shadow); + aws_byte_buf_init_copy_from_cursor(&operation->topic_filter, allocator, topic_filter); + + return operation; +} + +void s_aws_shadow_streaming_operation_destroy(struct aws_shadow_streaming_operation *operation) { + if (operation == NULL) { + return; + } + + aws_byte_buf_clean_up(&operation->thing); + aws_byte_buf_clean_up(&operation->shadow); + + aws_mem_release(operation->allocator, operation); +} + +static void s_release_streaming_operation(void *value) { + struct aws_shadow_streaming_operation *operation = value; + + if (operation->streaming_operation != NULL) { + aws_mqtt_rr_client_operation_release(operation->streaming_operation); + } else { + s_aws_shadow_streaming_operation_destroy(operation); + } +} + static void s_usage(int exit_code) { fprintf(stderr, "usage: elastishadow [options] endpoint\n"); @@ -165,6 +224,14 @@ static void s_split_command_line(struct aws_byte_cursor cursor, struct aws_array } } +static void s_write_correlation_token_string(struct aws_byte_cursor scratch_space) { + struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(scratch_space.ptr, scratch_space.len); + + struct aws_uuid uuid; + aws_uuid_init(&uuid); + aws_uuid_to_str(&uuid, &correlation_token_buf); +} + static void s_on_get_shadow_complete( const struct aws_byte_cursor *response_topic, const struct aws_byte_cursor *payload, @@ -174,23 +241,28 @@ static void s_on_get_shadow_complete( struct aws_string *correlation_token = user_data; if (payload != NULL) { - printf("GetNamedShadow request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n", correlation_token->bytes, AWS_BYTE_CURSOR_PRI(*response_topic), AWS_BYTE_CURSOR_PRI(*payload)); + printf( + "GetNamedShadow request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n\n", + correlation_token->bytes, + AWS_BYTE_CURSOR_PRI(*response_topic), + AWS_BYTE_CURSOR_PRI(*payload)); } else { - printf("GetNamedShadow request '%s' failed with error code %d(%s)\n", correlation_token->bytes, error_code, aws_error_debug_str(error_code)); + printf( + "GetNamedShadow request '%s' failed with error code %d(%s)\n\n", + correlation_token->bytes, + error_code, + aws_error_debug_str(error_code)); } aws_string_destroy(correlation_token); } -static void s_handle_get( - struct app_ctx *context, - struct aws_allocator *allocator, - struct aws_array_list *arguments) { +static void s_handle_get(struct app_ctx *context, struct aws_allocator *allocator, struct aws_array_list *arguments) { size_t argument_count = aws_array_list_length(arguments) - 1; if (argument_count != 2) { printf("invalid get-named-shadow options:\n"); - printf(" get-named-shadow \n"); + printf(" get-named-shadow \n\n"); return; } @@ -203,13 +275,29 @@ static void s_handle_get( aws_array_list_get_at(arguments, &shadow_name_cursor, 2); char subscription_topic_filter[128]; - snprintf(subscription_topic_filter, AWS_ARRAY_SIZE(subscription_topic_filter), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/+", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + subscription_topic_filter, + AWS_ARRAY_SIZE(subscription_topic_filter), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/+", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + struct aws_byte_cursor subscription_topic_filter_cursor = aws_byte_cursor_from_c_str(subscription_topic_filter); char accepted_path[128]; - snprintf(accepted_path, AWS_ARRAY_SIZE(accepted_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/accepted", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + accepted_path, + AWS_ARRAY_SIZE(accepted_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/accepted", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); char rejected_path[128]; - snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + rejected_path, + AWS_ARRAY_SIZE(rejected_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get/rejected", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); struct aws_mqtt_request_operation_response_path response_paths[] = { @@ -224,20 +312,22 @@ static void s_handle_get( }; char publish_topic[128]; - snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + publish_topic, + AWS_ARRAY_SIZE(publish_topic), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/get", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); char correlation_token[128]; - struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); - - struct aws_uuid uuid; - aws_uuid_init(&uuid); - aws_uuid_to_str(&uuid, &correlation_token_buf); + s_write_correlation_token_string(aws_byte_cursor_from_array(correlation_token, AWS_ARRAY_SIZE(correlation_token))); char request[256]; snprintf(request, AWS_ARRAY_SIZE(request), "{\"clientToken\":\"%s\"}", correlation_token); struct aws_mqtt_request_operation_options get_options = { - .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_topic_filters = &subscription_topic_filter_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = 2, .publish_topic = aws_byte_cursor_from_c_str(publish_topic), @@ -247,26 +337,22 @@ static void s_handle_get( .user_data = aws_string_new_from_c_str(allocator, correlation_token), }; - printf("Submitting GetNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + printf( + "Submitting GetNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR + "' using correlation token %s...\n", + AWS_BYTE_CURSOR_PRI(shadow_name_cursor), + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + correlation_token); if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { int error_code = aws_last_error(); - printf("GetNamedShadow synchronous failure: %d(%s)", error_code, aws_error_debug_str(error_code)); + printf("GetNamedShadow synchronous failure: %d(%s)\n", error_code, aws_error_debug_str(error_code)); } -} -static void s_handle_update_reported( - struct app_ctx *context, - struct aws_allocator *allocator, - struct aws_array_list *arguments, - struct aws_byte_cursor line_cursor) { - (void)context; - (void)allocator; - (void)arguments; - (void)line_cursor; + printf("\n"); } -static void s_on_update_shadow_desired_complete( +static void s_on_update_shadow_complete( const struct aws_byte_cursor *response_topic, const struct aws_byte_cursor *payload, int error_code, @@ -275,27 +361,28 @@ static void s_on_update_shadow_desired_complete( struct aws_string *correlation_token = user_data; if (payload != NULL) { - printf("UpdateNameShadowDesired request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n", correlation_token->bytes, AWS_BYTE_CURSOR_PRI(*response_topic), AWS_BYTE_CURSOR_PRI(*payload)); + printf( + "UpdateNamedShadow request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n\n", + correlation_token->bytes, + AWS_BYTE_CURSOR_PRI(*response_topic), + AWS_BYTE_CURSOR_PRI(*payload)); } else { - printf("UpdateNameShadowDesired request '%s' failed with error code %d(%s)\n", correlation_token->bytes, error_code, aws_error_debug_str(error_code)); + printf( + "UpdateNamedShadow request '%s' failed with error code %d(%s)\n\n", + correlation_token->bytes, + error_code, + aws_error_debug_str(error_code)); } aws_string_destroy(correlation_token); } - -static void s_handle_update_desired( +static void s_handle_update( struct app_ctx *context, struct aws_allocator *allocator, struct aws_array_list *arguments, - struct aws_byte_cursor line_cursor) { - - size_t argument_count = aws_array_list_length(arguments) - 1; - if (argument_count < 3) { - printf("invalid update-named-shadow-desired options:\n"); - printf(" delete-named-shadow \n"); - return; - } + struct aws_byte_cursor desired_state, + struct aws_byte_cursor correlation_token) { struct aws_byte_cursor thing_name_cursor; AWS_ZERO_STRUCT(thing_name_cursor); @@ -305,18 +392,46 @@ static void s_handle_update_desired( AWS_ZERO_STRUCT(shadow_name_cursor); aws_array_list_get_at(arguments, &shadow_name_cursor, 2); - struct aws_byte_cursor desired_state_cursor; - aws_array_list_get_at(arguments, &desired_state_cursor, 3); - desired_state_cursor.len = (size_t)(line_cursor.ptr + line_cursor.len - desired_state_cursor.ptr); - - char subscription_topic_filter[128]; - snprintf(subscription_topic_filter, AWS_ARRAY_SIZE(subscription_topic_filter), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/+", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + char subscription_topic_filter_accepted[128]; + snprintf( + subscription_topic_filter_accepted, + AWS_ARRAY_SIZE(subscription_topic_filter_accepted), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/accepted", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + struct aws_byte_cursor subscription_topic_filter_accepted_cursor = + aws_byte_cursor_from_c_str(subscription_topic_filter_accepted); + + char subscription_topic_filter_rejected[128]; + snprintf( + subscription_topic_filter_rejected, + AWS_ARRAY_SIZE(subscription_topic_filter_rejected), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/rejected", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + struct aws_byte_cursor subscription_topic_filter_rejected_cursor = + aws_byte_cursor_from_c_str(subscription_topic_filter_rejected); + + struct aws_byte_cursor subscription_topic_filters[] = { + subscription_topic_filter_accepted_cursor, + subscription_topic_filter_rejected_cursor, + }; char accepted_path[128]; - snprintf(accepted_path, AWS_ARRAY_SIZE(accepted_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/accepted", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + accepted_path, + AWS_ARRAY_SIZE(accepted_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/accepted", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); char rejected_path[128]; - snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + rejected_path, + AWS_ARRAY_SIZE(rejected_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/rejected", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); struct aws_mqtt_request_operation_response_path response_paths[] = { @@ -331,35 +446,110 @@ static void s_handle_update_desired( }; char publish_topic[128]; - snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); - - char correlation_token[128]; - struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); - - struct aws_uuid uuid; - aws_uuid_init(&uuid); - aws_uuid_to_str(&uuid, &correlation_token_buf); - - char request[256]; - snprintf(request, AWS_ARRAY_SIZE(request), "{\"clientToken\":\"%s\",\"state\":{\"desired\":" PRInSTR "}}", correlation_token, AWS_BYTE_CURSOR_PRI(desired_state_cursor)); + snprintf( + publish_topic, + AWS_ARRAY_SIZE(publish_topic), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); struct aws_mqtt_request_operation_options get_options = { - .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_topic_filters = subscription_topic_filters, + .subscription_topic_filter_count = 2, .response_paths = response_paths, .response_path_count = 2, .publish_topic = aws_byte_cursor_from_c_str(publish_topic), - .serialized_request = aws_byte_cursor_from_c_str(request), - .correlation_token = aws_byte_cursor_from_c_str(correlation_token), - .completion_callback = s_on_update_shadow_desired_complete, - .user_data = aws_string_new_from_c_str(allocator, correlation_token), + .serialized_request = desired_state, + .correlation_token = correlation_token, + .completion_callback = s_on_update_shadow_complete, + .user_data = aws_string_new_from_cursor(allocator, &correlation_token), }; - printf("Submitting UpdateNameShadowDesired request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + printf( + "Submitting UpdateNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR + "' using correlation token '" PRInSTR "'...\n", + AWS_BYTE_CURSOR_PRI(shadow_name_cursor), + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(correlation_token)); if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { int error_code = aws_last_error(); - printf("UpdateNameShadowDesired synchronous failure: %d(%s)", error_code, aws_error_debug_str(error_code)); + printf("UpdateNamedShadow synchronous failure: %d(%s)\n", error_code, aws_error_debug_str(error_code)); } + + printf("\n"); +} + +static void s_handle_update_desired( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments, + struct aws_byte_cursor line_cursor) { + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count < 3) { + printf("invalid update-named-shadow-desired options:\n"); + printf(" update-named-shadow-desired \n\n"); + return; + } + + struct aws_byte_cursor desired_state_cursor; + aws_array_list_get_at(arguments, &desired_state_cursor, 3); + desired_state_cursor.len = (size_t)(line_cursor.ptr + line_cursor.len - desired_state_cursor.ptr); + + char correlation_token[128]; + s_write_correlation_token_string(aws_byte_cursor_from_array(correlation_token, AWS_ARRAY_SIZE(correlation_token))); + + char request[256]; + snprintf( + request, + AWS_ARRAY_SIZE(request), + "{\"clientToken\":\"%s\",\"state\":{\"desired\":" PRInSTR "}}", + correlation_token, + AWS_BYTE_CURSOR_PRI(desired_state_cursor)); + + s_handle_update( + context, + allocator, + arguments, + aws_byte_cursor_from_c_str(request), + aws_byte_cursor_from_c_str(correlation_token)); +} + +static void s_handle_update_reported( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments, + struct aws_byte_cursor line_cursor) { + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count < 3) { + printf("invalid update-named-shadow-reported options:\n"); + printf(" update-named-shadow-reported \n\n"); + return; + } + + struct aws_byte_cursor reported_state_cursor; + aws_array_list_get_at(arguments, &reported_state_cursor, 3); + reported_state_cursor.len = (size_t)(line_cursor.ptr + line_cursor.len - reported_state_cursor.ptr); + + char correlation_token[128]; + s_write_correlation_token_string(aws_byte_cursor_from_array(correlation_token, AWS_ARRAY_SIZE(correlation_token))); + + char request[256]; + snprintf( + request, + AWS_ARRAY_SIZE(request), + "{\"clientToken\":\"%s\",\"state\":{\"reported\":" PRInSTR "}}", + correlation_token, + AWS_BYTE_CURSOR_PRI(reported_state_cursor)); + + s_handle_update( + context, + allocator, + arguments, + aws_byte_cursor_from_c_str(request), + aws_byte_cursor_from_c_str(correlation_token)); } static void s_on_delete_shadow_complete( @@ -371,9 +561,17 @@ static void s_on_delete_shadow_complete( struct aws_string *correlation_token = user_data; if (payload != NULL) { - printf("DeleteNamedShadow request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n", correlation_token->bytes, AWS_BYTE_CURSOR_PRI(*response_topic), AWS_BYTE_CURSOR_PRI(*payload)); + printf( + "DeleteNamedShadow request '%s' response received on topic '" PRInSTR "' with body:\n " PRInSTR "\n\n", + correlation_token->bytes, + AWS_BYTE_CURSOR_PRI(*response_topic), + AWS_BYTE_CURSOR_PRI(*payload)); } else { - printf("DeleteNamedShadow request '%s' failed with error code %d(%s)\n", correlation_token->bytes, error_code, aws_error_debug_str(error_code)); + printf( + "DeleteNamedShadow request '%s' failed with error code %d(%s)\n\n", + correlation_token->bytes, + error_code, + aws_error_debug_str(error_code)); } aws_string_destroy(correlation_token); @@ -387,7 +585,7 @@ static void s_handle_delete( size_t argument_count = aws_array_list_length(arguments) - 1; if (argument_count != 2) { printf("invalid delete-named-shadow options:\n"); - printf(" delete-named-shadow \n"); + printf(" delete-named-shadow \n\n"); return; } @@ -400,13 +598,29 @@ static void s_handle_delete( aws_array_list_get_at(arguments, &shadow_name_cursor, 2); char subscription_topic_filter[128]; - snprintf(subscription_topic_filter, AWS_ARRAY_SIZE(subscription_topic_filter), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/+", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + subscription_topic_filter, + AWS_ARRAY_SIZE(subscription_topic_filter), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/+", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + struct aws_byte_cursor subscription_topic_filter_cursor = aws_byte_cursor_from_c_str(subscription_topic_filter); char accepted_path[128]; - snprintf(accepted_path, AWS_ARRAY_SIZE(accepted_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/accepted", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + accepted_path, + AWS_ARRAY_SIZE(accepted_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/accepted", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); char rejected_path[128]; - snprintf(rejected_path, AWS_ARRAY_SIZE(rejected_path), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/rejected", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + rejected_path, + AWS_ARRAY_SIZE(rejected_path), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete/rejected", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); struct aws_byte_cursor correlation_token_path = aws_byte_cursor_from_c_str("clientToken"); struct aws_mqtt_request_operation_response_path response_paths[] = { @@ -421,10 +635,16 @@ static void s_handle_delete( }; char publish_topic[128]; - snprintf(publish_topic, AWS_ARRAY_SIZE(publish_topic), "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete", AWS_BYTE_CURSOR_PRI(thing_name_cursor), AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + snprintf( + publish_topic, + AWS_ARRAY_SIZE(publish_topic), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/delete", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); char correlation_token[128]; - struct aws_byte_buf correlation_token_buf = aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); + struct aws_byte_buf correlation_token_buf = + aws_byte_buf_from_empty_array(correlation_token, AWS_ARRAY_SIZE(correlation_token)); struct aws_uuid uuid; aws_uuid_init(&uuid); @@ -434,7 +654,8 @@ static void s_handle_delete( snprintf(request, AWS_ARRAY_SIZE(request), "{\"clientToken\":\"%s\"}", correlation_token); struct aws_mqtt_request_operation_options get_options = { - .subscription_topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_topic_filters = &subscription_topic_filter_cursor, + .subscription_topic_filter_count = 1, .response_paths = response_paths, .response_path_count = 2, .publish_topic = aws_byte_cursor_from_c_str(publish_topic), @@ -444,11 +665,270 @@ static void s_handle_delete( .user_data = aws_string_new_from_c_str(allocator, correlation_token), }; - printf("Submitting DeleteNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR "' using correlation token %s...\n", AWS_BYTE_CURSOR_PRI(shadow_name_cursor), AWS_BYTE_CURSOR_PRI(thing_name_cursor), correlation_token); + printf( + "Submitting DeleteNamedShadow request for shadow '" PRInSTR "' of thing '" PRInSTR + "' using correlation token %s...\n", + AWS_BYTE_CURSOR_PRI(shadow_name_cursor), + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + correlation_token); if (aws_mqtt_request_response_client_submit_request(context->rr_client, &get_options) == AWS_OP_ERR) { int error_code = aws_last_error(); - printf("DeleteNamedShadow synchronous failure: %d(%s)", error_code, aws_error_debug_str(error_code)); + printf("DeleteNamedShadow synchronous failure: %d(%s)\n", error_code, aws_error_debug_str(error_code)); + } + + printf("\n"); +} + +static int s_print_streaming_operation(void *context, struct aws_hash_element *elem) { + (void)context; + struct aws_shadow_streaming_operation *operation = elem->value; + + struct aws_byte_cursor thing_cursor = aws_byte_cursor_from_buf(&operation->thing); + struct aws_byte_cursor shadow_cursor = aws_byte_cursor_from_buf(&operation->shadow); + struct aws_byte_cursor topic_filter_cursor = aws_byte_cursor_from_buf(&operation->topic_filter); + + printf( + " %" PRIu64 " Thing:'" PRInSTR "', Shadow: '" PRInSTR "', TopicFilter:'" PRInSTR "'\n", + operation->id, + AWS_BYTE_CURSOR_PRI(thing_cursor), + AWS_BYTE_CURSOR_PRI(shadow_cursor), + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + +static void s_handle_list_streams( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments) { + (void)allocator; + (void)arguments; + + printf("Open Streams:\n"); + aws_hash_table_foreach(&context->streaming_operations, s_print_streaming_operation, NULL); + printf("\n"); +} + +static const char *s_rr_streaming_subscription_event_type_to_c_str( + enum aws_rr_streaming_subscription_event_type status) { + switch (status) { + case ARRSSET_SUBSCRIPTION_ESTABLISHED: + return "SubscriptionEstablished"; + + case ARRSSET_SUBSCRIPTION_LOST: + return "SubscriptionLost"; + + case ARRSSET_SUBSCRIPTION_HALTED: + return "SubscriptionHalted"; + + default: + return "Unknown"; + } +} + +static void s_stream_subscription_status_fn( + enum aws_rr_streaming_subscription_event_type status, + int error_code, + void *user_data) { + + struct aws_shadow_streaming_operation *operation = user_data; + + struct aws_byte_cursor topic_filter_cursor = aws_byte_cursor_from_buf(&operation->topic_filter); + + printf( + "Streaming operation %" PRIu64 " received subscription status event on topic filter '" PRInSTR "':\n", + operation->id, + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); + printf( + " Status: %d(%s), ErrorCode: %d(%s)\n\n", + status, + s_rr_streaming_subscription_event_type_to_c_str(status), + error_code, + aws_error_debug_str(error_code)); +} + +static void s_stream_incoming_publish_fn(struct aws_byte_cursor payload, void *user_data) { + struct aws_shadow_streaming_operation *operation = user_data; + + struct aws_byte_cursor thing_cursor = aws_byte_cursor_from_buf(&operation->thing); + struct aws_byte_cursor shadow_cursor = aws_byte_cursor_from_buf(&operation->shadow); + struct aws_byte_cursor topic_filter_cursor = aws_byte_cursor_from_buf(&operation->topic_filter); + + printf( + "Streaming operation %" PRIu64 " ('" PRInSTR "', ' " PRInSTR "') received publish on topic filter '" PRInSTR + "':\n", + operation->id, + AWS_BYTE_CURSOR_PRI(thing_cursor), + AWS_BYTE_CURSOR_PRI(shadow_cursor), + AWS_BYTE_CURSOR_PRI(topic_filter_cursor)); + printf(" " PRInSTR "\n\n", AWS_BYTE_CURSOR_PRI(payload)); +} + +static void s_stream_terminated_fn(void *user_data) { + struct aws_shadow_streaming_operation *operation = user_data; + + printf("Stream %" PRIu64 "terminated\n\n", operation->id); + + s_aws_shadow_streaming_operation_destroy(operation); +} + +static void s_handle_open_delta_stream( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments) { + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count != 2) { + printf("invalid open-named-shadow-delta-stream options:\n"); + printf(" open-named-shadow-delta-stream \n\n"); + return; + } + + struct aws_byte_cursor thing_name_cursor; + AWS_ZERO_STRUCT(thing_name_cursor); + aws_array_list_get_at(arguments, &thing_name_cursor, 1); + + struct aws_byte_cursor shadow_name_cursor; + AWS_ZERO_STRUCT(shadow_name_cursor); + aws_array_list_get_at(arguments, &shadow_name_cursor, 2); + + char subscription_topic_filter[128]; + snprintf( + subscription_topic_filter, + AWS_ARRAY_SIZE(subscription_topic_filter), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/delta", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + struct aws_shadow_streaming_operation *operation = s_aws_shadow_streaming_operation_new( + allocator, + context, + thing_name_cursor, + shadow_name_cursor, + aws_byte_cursor_from_c_str(subscription_topic_filter)); + aws_hash_table_put(&context->streaming_operations, &operation->id, operation, NULL); + + struct aws_mqtt_streaming_operation_options open_stream_options = { + .topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_status_callback = s_stream_subscription_status_fn, + .incoming_publish_callback = s_stream_incoming_publish_fn, + .terminated_callback = s_stream_terminated_fn, + .user_data = operation, + }; + + printf( + "Opening NamedShadow delta stream with id %" PRIu64 " for shadow '" PRInSTR "' of thing '" PRInSTR "'...\n", + operation->id, + AWS_BYTE_CURSOR_PRI(shadow_name_cursor), + AWS_BYTE_CURSOR_PRI(thing_name_cursor)); + + operation->streaming_operation = + aws_mqtt_request_response_client_create_streaming_operation(context->rr_client, &open_stream_options); + if (operation->streaming_operation == NULL) { + int error_code = aws_last_error(); + printf( + "NamedShadow delta stream synchronous open failure: %d(%s)\n", error_code, aws_error_debug_str(error_code)); + + aws_hash_table_remove(&context->streaming_operations, &operation->id, NULL, NULL); + } + + printf("\n"); +} + +static void s_handle_open_document_stream( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments) { + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count != 2) { + printf("invalid open-named-shadow-document-stream options:\n"); + printf(" open-named-shadow-document-stream \n\n"); + return; + } + + struct aws_byte_cursor thing_name_cursor; + AWS_ZERO_STRUCT(thing_name_cursor); + aws_array_list_get_at(arguments, &thing_name_cursor, 1); + + struct aws_byte_cursor shadow_name_cursor; + AWS_ZERO_STRUCT(shadow_name_cursor); + aws_array_list_get_at(arguments, &shadow_name_cursor, 2); + + char subscription_topic_filter[128]; + snprintf( + subscription_topic_filter, + AWS_ARRAY_SIZE(subscription_topic_filter), + "$aws/things/" PRInSTR "/shadow/name/" PRInSTR "/update/document", + AWS_BYTE_CURSOR_PRI(thing_name_cursor), + AWS_BYTE_CURSOR_PRI(shadow_name_cursor)); + + struct aws_shadow_streaming_operation *operation = s_aws_shadow_streaming_operation_new( + allocator, + context, + thing_name_cursor, + shadow_name_cursor, + aws_byte_cursor_from_c_str(subscription_topic_filter)); + aws_hash_table_put(&context->streaming_operations, &operation->id, operation, NULL); + + struct aws_mqtt_streaming_operation_options open_stream_options = { + .topic_filter = aws_byte_cursor_from_c_str(subscription_topic_filter), + .subscription_status_callback = s_stream_subscription_status_fn, + .incoming_publish_callback = s_stream_incoming_publish_fn, + .terminated_callback = s_stream_terminated_fn, + .user_data = operation, + }; + + printf( + "Opening NamedShadow document stream with id %" PRIu64 " for shadow '" PRInSTR "' of thing '" PRInSTR "'...\n", + operation->id, + AWS_BYTE_CURSOR_PRI(shadow_name_cursor), + AWS_BYTE_CURSOR_PRI(thing_name_cursor)); + + operation->streaming_operation = + aws_mqtt_request_response_client_create_streaming_operation(context->rr_client, &open_stream_options); + if (operation->streaming_operation == NULL) { + int error_code = aws_last_error(); + printf( + "NamedShadow document stream synchronous open failure: %d(%s)\n", + error_code, + aws_error_debug_str(error_code)); + + aws_hash_table_remove(&context->streaming_operations, &operation->id, NULL, NULL); + } + + printf("\n"); +} + +static void s_handle_close_stream( + struct app_ctx *context, + struct aws_allocator *allocator, + struct aws_array_list *arguments) { + (void)allocator; + + size_t argument_count = aws_array_list_length(arguments) - 1; + if (argument_count != 1) { + printf("invalid close-stream options:\n"); + printf(" close-stream \n\n"); + return; + } + + struct aws_byte_cursor id_cursor; + AWS_ZERO_STRUCT(id_cursor); + aws_array_list_get_at(arguments, &id_cursor, 1); + + char id_buffer[32]; + snprintf(id_buffer, AWS_ARRAY_SIZE(id_buffer), PRInSTR, AWS_BYTE_CURSOR_PRI(id_cursor)); + uint64_t id = atoi(id_buffer); + + int was_present = 0; + aws_hash_table_remove(&context->streaming_operations, &id, NULL, &was_present); + + if (was_present == 0) { + printf("Stream %" PRIu64 " does not exist\n\n", id); + } else { + printf("Closing stream %" PRIu64 "\n\n", id); } } @@ -463,6 +943,11 @@ static bool s_handle_input(struct app_ctx *context, struct aws_allocator *alloca struct aws_byte_cursor update_reported_cursor = aws_byte_cursor_from_c_str("update-named-shadow-reported"); struct aws_byte_cursor update_desired_cursor = aws_byte_cursor_from_c_str("update-named-shadow-desired"); struct aws_byte_cursor delete_cursor = aws_byte_cursor_from_c_str("delete-named-shadow"); + struct aws_byte_cursor list_streams_cursor = aws_byte_cursor_from_c_str("list-streams"); + struct aws_byte_cursor open_delta_stream_cursor = aws_byte_cursor_from_c_str("open-named-shadow-delta-stream"); + struct aws_byte_cursor open_document_stream_cursor = + aws_byte_cursor_from_c_str("open-named-shadow-document-stream"); + struct aws_byte_cursor close_stream_cursor = aws_byte_cursor_from_c_str("close-stream"); struct aws_array_list words; aws_array_list_init_dynamic(&words, allocator, 10, sizeof(struct aws_byte_cursor)); @@ -498,8 +983,26 @@ static bool s_handle_input(struct app_ctx *context, struct aws_allocator *alloca s_handle_update_desired(context, allocator, &words, line_cursor); } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &delete_cursor)) { s_handle_delete(context, allocator, &words); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &list_streams_cursor)) { + s_handle_list_streams(context, allocator, &words); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &open_delta_stream_cursor)) { + s_handle_open_delta_stream(context, allocator, &words); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &open_document_stream_cursor)) { + s_handle_open_document_stream(context, allocator, &words); + } else if (aws_byte_cursor_eq_ignore_case(&command_cursor, &close_stream_cursor)) { + s_handle_close_stream(context, allocator, &words); } else { printf("Unknown command: " PRInSTR "\n", AWS_BYTE_CURSOR_PRI(command_cursor)); + printf("\nValid commands:\n"); + printf(" get-named-shadow - gets the full state of a named shadow\n"); + printf(" delete-named-shadow - deletes a named shadow\n"); + printf(" update-named-shadow-reported - updates the reported state of a named shadow\n"); + printf(" update-named-shadow-desired - updates the desired state of a named shadow\n"); + printf("\n"); + printf(" start - starts the mqtt5 client underlying the request-response client\n"); + printf(" stop - starts the mqtt5 client underlying the request-response client\n"); + printf(" quit - quits the program\n"); + printf("\n"); } done: @@ -542,10 +1045,6 @@ static void s_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_e fflush(stdout); } -static void s_release_streaming_operations(struct app_ctx *ctx) { - (void)ctx; -} - AWS_STATIC_STRING_FROM_LITERAL(s_client_id, "HelloWorld"); int main(int argc, char **argv) { @@ -560,6 +1059,15 @@ int main(int argc, char **argv) { aws_mutex_init(&app_ctx.lock); app_ctx.port = 1883; + aws_hash_table_init( + &app_ctx.streaming_operations, + allocator, + 10, + aws_mqtt_hash_uint64_t, + aws_mqtt_compare_uint64_t_eq, + NULL, + s_release_streaming_operation); + s_parse_options(argc, argv, &app_ctx); if (app_ctx.uri.port) { app_ctx.port = app_ctx.uri.port; @@ -676,11 +1184,13 @@ int main(int argc, char **argv) { app_ctx.client = aws_mqtt5_client_new(allocator, &client_options); struct aws_mqtt_request_response_client_options rr_client_options = { - .max_subscriptions = 30, + .max_request_response_subscriptions = 10, + .max_streaming_subscriptions = 5, .operation_timeout_seconds = 60, }; - app_ctx.rr_client = aws_mqtt_request_response_client_new_from_mqtt5_client(allocator, app_ctx.client, &rr_client_options); + app_ctx.rr_client = + aws_mqtt_request_response_client_new_from_mqtt5_client(allocator, app_ctx.client, &rr_client_options); bool done = false; while (!done) { @@ -695,7 +1205,7 @@ int main(int argc, char **argv) { done = s_handle_input(&app_ctx, allocator, line); } - s_release_streaming_operations(&app_ctx); + aws_hash_table_clean_up(&app_ctx.streaming_operations); aws_mqtt_request_response_client_release(app_ctx.rr_client); aws_mqtt5_client_release(app_ctx.client); diff --git a/include/aws/mqtt/private/shared_constants.h b/include/aws/mqtt/private/shared.h similarity index 69% rename from include/aws/mqtt/private/shared_constants.h rename to include/aws/mqtt/private/shared.h index 0a835942..5d36d7f5 100644 --- a/include/aws/mqtt/private/shared_constants.h +++ b/include/aws/mqtt/private/shared.h @@ -8,11 +8,16 @@ #include +#include + AWS_EXTERN_C_BEGIN AWS_MQTT_API extern const struct aws_byte_cursor *g_websocket_handshake_default_path; AWS_MQTT_API extern const struct aws_http_header *g_websocket_handshake_default_protocol_header; +AWS_MQTT_API uint64_t aws_mqtt_hash_uint64_t(const void *item); +AWS_MQTT_API bool aws_mqtt_compare_uint64_t_eq(const void *a, const void *b); + AWS_EXTERN_C_END #endif /* AWS_MQTT_SHARED_CONSTANTS_H */ diff --git a/source/client.c b/source/client.c index 36c6d134..8102ea24 100644 --- a/source/client.c +++ b/source/client.c @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include diff --git a/source/shared_constants.c b/source/shared.c similarity index 95% rename from source/shared_constants.c rename to source/shared.c index 8d260799..e84c4afe 100644 --- a/source/shared_constants.c +++ b/source/shared.c @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ -#include +#include #include diff --git a/source/v5/mqtt5_client.c b/source/v5/mqtt5_client.c index 8f9ca3dc..7474e1bd 100644 --- a/source/v5/mqtt5_client.c +++ b/source/v5/mqtt5_client.c @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include