Skip to content

Commit

Permalink
Vtable refactor311 (#280)
Browse files Browse the repository at this point in the history
Refactors the mqtt311 implementation into two pieces: a generic wrapper with a vtable and the actual implementation.
  • Loading branch information
bretambrose authored May 8, 2023
1 parent e6ea3cc commit cdff1b7
Show file tree
Hide file tree
Showing 7 changed files with 669 additions and 267 deletions.
45 changes: 25 additions & 20 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <aws/mqtt/client.h>

#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/topic_tree.h>

Expand All @@ -21,16 +22,18 @@
#include <aws/io/socket.h>
#include <aws/io/tls_channel_handler.h>

struct aws_mqtt_client_connection_311_impl;

#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback) \
do { \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), (client_ptr)->callback##_ud); \
(client_ptr)->callback((&client_ptr->base), (client_ptr)->callback##_ud); \
} \
} while (false)
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \
do { \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud); \
(client_ptr)->callback((&client_ptr->base), __VA_ARGS__, (client_ptr)->callback##_ud); \
} \
} while (false)

Expand Down Expand Up @@ -101,7 +104,8 @@ typedef enum aws_mqtt_client_request_state(
/**
* Called when the operation statistics change.
*/
typedef void(aws_mqtt_on_operation_statistics_fn)(struct aws_mqtt_client_connection *connection, void *userdata);
typedef void(
aws_mqtt_on_operation_statistics_fn)(struct aws_mqtt_client_connection_311_impl *connection, void *userdata);

/* Flags that indicate the way in which way an operation is currently affecting the statistics of the connection */
enum aws_mqtt_operation_statistic_state_flags {
Expand All @@ -119,7 +123,7 @@ struct aws_mqtt_request {
struct aws_linked_list_node list_node;

struct aws_allocator *allocator;
struct aws_mqtt_client_connection *connection;
struct aws_mqtt_client_connection_311_impl *connection;

struct aws_channel_task outgoing_task;

Expand All @@ -146,7 +150,7 @@ struct aws_mqtt_reconnect_task {
/* The lifetime of this struct is from subscribe -> suback */
struct subscribe_task_arg {

struct aws_mqtt_client_connection *connection;
struct aws_mqtt_client_connection_311_impl *connection;

/* list of pointer of subscribe_task_topics */
struct aws_array_list topics;
Expand All @@ -166,7 +170,7 @@ struct subscribe_task_arg {

/* The lifetime of this struct is the same as the lifetime of the subscription */
struct subscribe_task_topic {
struct aws_mqtt_client_connection *connection;
struct aws_mqtt_client_connection_311_impl *connection;

struct aws_mqtt_topic_subscription request;
struct aws_string *filter;
Expand All @@ -175,10 +179,11 @@ struct subscribe_task_topic {
struct aws_ref_count ref_count;
};

struct aws_mqtt_client_connection {

struct aws_mqtt_client_connection_311_impl {
struct aws_allocator *allocator;
struct aws_ref_count ref_count;

struct aws_mqtt_client_connection base;

struct aws_mqtt_client *client;

/* Channel handler information */
Expand Down Expand Up @@ -328,15 +333,15 @@ struct aws_channel_handler_vtable *aws_mqtt_get_client_channel_vtable(void);

/* Helper for getting a message object for a packet */
struct aws_io_message *mqtt_get_message_for_packet(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
struct aws_mqtt_fixed_header *header);

void mqtt_connection_lock_synced_data(struct aws_mqtt_client_connection *connection);
void mqtt_connection_unlock_synced_data(struct aws_mqtt_client_connection *connection);
void mqtt_connection_lock_synced_data(struct aws_mqtt_client_connection_311_impl *connection);
void mqtt_connection_unlock_synced_data(struct aws_mqtt_client_connection_311_impl *connection);

/* Note: needs to be called with lock held. */
void mqtt_connection_set_state(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
enum aws_mqtt_client_connection_state state);

/**
Expand All @@ -346,7 +351,7 @@ void mqtt_connection_set_state(
* noRetry is true for the packets will never be retried or offline queued.
*/
AWS_MQTT_API uint16_t mqtt_create_request(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
aws_mqtt_send_request_fn *send_request,
void *send_request_ud,
aws_mqtt_op_complete_fn *on_complete,
Expand All @@ -356,15 +361,15 @@ AWS_MQTT_API uint16_t mqtt_create_request(

/* Call when an ack packet comes back from the server. */
AWS_MQTT_API void mqtt_request_complete(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
int error_code,
uint16_t packet_id);

/* Call to close the connection with an error code */
AWS_MQTT_API void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int error_code);
AWS_MQTT_API void mqtt_disconnect_impl(struct aws_mqtt_client_connection_311_impl *connection, int error_code);

/* Creates the task used to reestablish a broken connection */
AWS_MQTT_API void aws_create_reconnect_task(struct aws_mqtt_client_connection *connection);
AWS_MQTT_API void aws_create_reconnect_task(struct aws_mqtt_client_connection_311_impl *connection);

/**
* Sets the callback to call whenever the operation statistics change.
Expand All @@ -374,7 +379,7 @@ AWS_MQTT_API void aws_create_reconnect_task(struct aws_mqtt_client_connection *c
* \param[in] on_operation_statistics_ud Userdata for on_operation_statistics
*/
AWS_MQTT_API int aws_mqtt_client_connection_set_on_operation_statistics_handler(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
aws_mqtt_on_operation_statistics_fn *on_operation_statistics,
void *on_operation_statistics_ud);

Expand All @@ -388,7 +393,7 @@ AWS_MQTT_API int aws_mqtt_client_connection_set_on_operation_statistics_handler(
* \returns AWS_OP_SUCCESS if the connection is open and the PINGREQ is sent or queued to send,
* otherwise AWS_OP_ERR and aws_last_error() is set.
*/
int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection);
int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection_311_impl *connection);

/**
* Changes the operation statistics for the passed-in aws_mqtt_request. Used for tracking
Expand All @@ -402,7 +407,7 @@ int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connectio
* @param new_state_flags The new state to use
*/
void aws_mqtt_connection_statistics_change_operation_statistic_state(
struct aws_mqtt_client_connection *connection,
struct aws_mqtt_client_connection_311_impl *connection,
struct aws_mqtt_request *request,
enum aws_mqtt_operation_statistic_state_flags new_state_flags);

Expand Down
111 changes: 111 additions & 0 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#ifndef AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H
#define AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H

/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#include <aws/mqtt/client.h>

struct aws_mqtt_client_connection;

struct aws_mqtt_client_connection_vtable {

int (*set_will_fn)(
void *impl,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
bool retain,
const struct aws_byte_cursor *payload);

int (*set_login_fn)(void *impl, const struct aws_byte_cursor *username, const struct aws_byte_cursor *password);

int (*use_websockets_fn)(
void *impl,
aws_mqtt_transform_websocket_handshake_fn *transformer,
void *transformer_ud,
aws_mqtt_validate_websocket_handshake_fn *validator,
void *validator_ud);

int (*set_http_proxy_options_fn)(void *impl, struct aws_http_proxy_options *proxy_options);

int (*set_host_resolution_options_fn)(void *impl, struct aws_host_resolution_config *host_resolution_config);

int (*set_reconnect_timeout_fn)(void *impl, uint64_t min_timeout, uint64_t max_timeout);

int (*set_connection_interruption_handlers_fn)(
void *impl,
aws_mqtt_client_on_connection_interrupted_fn *on_interrupted,
void *on_interrupted_ud,
aws_mqtt_client_on_connection_resumed_fn *on_resumed,
void *on_resumed_ud);

int (*set_connection_closed_handler_fn)(
void *impl,
aws_mqtt_client_on_connection_closed_fn *on_closed,
void *on_closed_ud);

int (*set_on_any_publish_handler_fn)(
void *impl,
aws_mqtt_client_publish_received_fn *on_any_publish,
void *on_any_publish_ud);

int (*connect_fn)(void *impl, const struct aws_mqtt_connection_options *connection_options);

int (*reconnect_fn)(void *impl, aws_mqtt_client_on_connection_complete_fn *on_connection_complete, void *userdata);

int (*disconnect_fn)(void *impl, aws_mqtt_client_on_disconnect_fn *on_disconnect, void *userdata);

uint16_t (*subscribe_multiple_fn)(
void *impl,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud);

uint16_t (*subscribe_fn)(
void *impl,
const struct aws_byte_cursor *topic_filter,
enum aws_mqtt_qos qos,
aws_mqtt_client_publish_received_fn *on_publish,
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud);

uint16_t (*subscribe_local_fn)(
void *impl,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_client_publish_received_fn *on_publish,
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud);

uint16_t (*resubscribe_existing_topics_fn)(void *impl, aws_mqtt_suback_multi_fn *on_suback, void *on_suback_ud);

uint16_t (*unsubscribe_fn)(
void *impl,
const struct aws_byte_cursor *topic_filter,
aws_mqtt_op_complete_fn *on_unsuback,
void *on_unsuback_ud);

uint16_t (*publish_fn)(
void *impl,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata);

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);
};

struct aws_mqtt_client_connection {
struct aws_mqtt_client_connection_vtable *vtable;
void *impl;
struct aws_ref_count ref_count;
};

#endif /* AWS_MQTT_PRIVATE_CLIENT_IMPL_SHARED_H */
2 changes: 1 addition & 1 deletion include/aws/mqtt/private/mqtt_client_test_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

struct aws_allocator;
struct aws_byte_cursor;
struct aws_mqtt_client_connection;
struct aws_mqtt_client_connection_311_impl;
struct aws_string;

AWS_EXTERN_C_BEGIN
Expand Down
Loading

0 comments on commit cdff1b7

Please sign in to comment.