Skip to content

Commit

Permalink
Metric for manager (#370)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored May 16, 2022
1 parent f716114 commit 73af2aa
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 8 deletions.
25 changes: 25 additions & 0 deletions include/aws/http/connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ typedef void(aws_http_connection_manager_on_connection_setup_fn)(

typedef void(aws_http_connection_manager_shutdown_complete_fn)(void *user_data);

/**
* Metrics for logging and debugging purpose.
*/
struct aws_http_manager_metrics {
/**
* The number of additional concurrent requests that can be supported by the HTTP manager without needing to
* establish additional connections to the target server.
*
* For connection manager, it equals to connections that's idle.
* For stream manager, it equals to the number of streams that are possible to be made without creating new
* connection, although the implementation can create new connection without fully filling it.
*/
size_t available_concurrency;
/* The number of requests that are awaiting concurrency to be made available from the HTTP manager. */
size_t pending_concurrency_acquires;
};

/*
* Connection manager configuration struct.
*
Expand Down Expand Up @@ -143,6 +160,14 @@ int aws_http_connection_manager_release_connection(
struct aws_http_connection_manager *manager,
struct aws_http_connection *connection);

/**
* Fetch the current manager metrics from connection manager.
*/
AWS_HTTP_API
void aws_http_connection_manager_fetch_metrics(
const struct aws_http_connection_manager *manager,
struct aws_http_manager_metrics *out_metrics);

AWS_EXTERN_C_END

#endif /* AWS_HTTP_CONNECTION_MANAGER_H */
12 changes: 12 additions & 0 deletions include/aws/http/http2_stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ struct proxy_env_var_settings;
struct aws_http2_setting;
struct aws_http_make_request_options;
struct aws_http_stream;
struct aws_http_manager_metrics;

/**
* Always invoked asynchronously when the stream was created, successfully or not.
Expand Down Expand Up @@ -168,5 +169,16 @@ void aws_http2_stream_manager_acquire_stream(
struct aws_http2_stream_manager *http2_stream_manager,
const struct aws_http2_stream_manager_acquire_stream_options *acquire_stream_option);

/**
* Fetch the current metrics from stream manager.
*
* @param http2_stream_manager
* @param out_metrics The metrics to be fetched
*/
AWS_HTTP_API
void aws_http2_stream_manager_fetch_metrics(
const struct aws_http2_stream_manager *http2_stream_manager,
struct aws_http_manager_metrics *out_metrics);

AWS_EXTERN_C_END
#endif /* AWS_HTTP2_STREAM_MANAGER_H */
13 changes: 10 additions & 3 deletions include/aws/http/private/random_access_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,23 @@ int aws_random_access_set_remove(struct aws_random_access_set *set, const void *
* Get the pointer to a random element from the data structure. Fails when the data structure is empty.
*/
AWS_HTTP_API
int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void **out);
int aws_random_access_set_random_get_ptr(const struct aws_random_access_set *set, void **out);

AWS_HTTP_API
size_t aws_random_access_set_get_size(struct aws_random_access_set *set);
size_t aws_random_access_set_get_size(const struct aws_random_access_set *set);

/**
* Check the element exist in the data structure or not.
*/
AWS_HTTP_API
int aws_random_access_set_exist(struct aws_random_access_set *set, const void *element, bool *exist);
int aws_random_access_set_exist(const struct aws_random_access_set *set, const void *element, bool *exist);

/**
* Get the pointer to an element that currently stored at that index. It may change if operations like remove and add
* happens. Helpful for debugging and iterating through the whole set.
*/
AWS_HTTP_API
int aws_random_access_set_random_get_ptr_index(const struct aws_random_access_set *set, void **out, size_t index);

AWS_EXTERN_C_END
#endif /* AWS_HTTP_RANDOM_ACCESS_SET_H */
12 changes: 12 additions & 0 deletions source/connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1538,3 +1538,15 @@ static void s_cull_task(struct aws_task *task, void *arg, enum aws_task_status s

s_schedule_connection_culling(manager);
}

void aws_http_connection_manager_fetch_metrics(
const struct aws_http_connection_manager *manager,
struct aws_http_manager_metrics *out_metrics) {
AWS_PRECONDITION(manager);
AWS_PRECONDITION(out_metrics);

AWS_FATAL_ASSERT(aws_mutex_lock((struct aws_mutex *)(void *)&manager->lock) == AWS_OP_SUCCESS);
out_metrics->available_concurrency = manager->idle_connection_count;
out_metrics->pending_concurrency_acquires = manager->pending_acquisition_count;
AWS_FATAL_ASSERT(aws_mutex_unlock((struct aws_mutex *)(void *)&manager->lock) == AWS_OP_SUCCESS);
}
35 changes: 33 additions & 2 deletions source/http2_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static struct aws_h2_sm_connection *s_get_best_sm_connection_from_set(struct aws

/* helper function for building the transaction: Try to assign connection for a pending stream acquisition */
/* *_synced should only be called with LOCK HELD or from another synced function */
static void s_sm_try_assign_connection_to_pending_stream_acquisition(
static void s_sm_try_assign_connection_to_pending_stream_acquisition_synced(
struct aws_http2_stream_manager *stream_manager,
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition) {

Expand Down Expand Up @@ -344,7 +344,7 @@ static void s_aws_http2_stream_manager_build_transaction_synced(struct aws_http2
aws_linked_list_pop_front(&stream_manager->synced_data.pending_stream_acquisitions);
struct aws_h2_sm_pending_stream_acquisition *pending_stream_acquisition =
AWS_CONTAINER_OF(node, struct aws_h2_sm_pending_stream_acquisition, node);
s_sm_try_assign_connection_to_pending_stream_acquisition(stream_manager, pending_stream_acquisition);
s_sm_try_assign_connection_to_pending_stream_acquisition_synced(stream_manager, pending_stream_acquisition);
if (pending_stream_acquisition->sm_connection == NULL) {
/* Cannot find any connection, push it back to the front and break the loop */
aws_linked_list_push_front(&stream_manager->synced_data.pending_stream_acquisitions, node);
Expand Down Expand Up @@ -994,3 +994,34 @@ void aws_http2_stream_manager_acquire_stream(
} /* END CRITICAL SECTION */
s_aws_http2_stream_manager_execute_transaction(&work);
}

static size_t s_get_available_streams_num_from_connection_set(const struct aws_random_access_set *set) {
size_t all_available_streams_num = 0;
size_t ideal_connection_num = aws_random_access_set_get_size(set);
for (size_t i = 0; i < ideal_connection_num; i++) {
struct aws_h2_sm_connection *sm_connection = NULL;
AWS_FATAL_ASSERT(aws_random_access_set_random_get_ptr_index(set, (void **)&sm_connection, i) == AWS_OP_SUCCESS);
uint32_t available_streams = sm_connection->max_concurrent_streams - sm_connection->num_streams_assigned;
all_available_streams_num += (size_t)available_streams;
}
return all_available_streams_num;
}

void aws_http2_stream_manager_fetch_metrics(
const struct aws_http2_stream_manager *stream_manager,
struct aws_http_manager_metrics *out_metrics) {
AWS_PRECONDITION(stream_manager);
AWS_PRECONDITION(out_metrics);
{ /* BEGIN CRITICAL SECTION */
s_lock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
size_t all_available_streams_num = 0;
all_available_streams_num +=
s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.ideal_available_set);
all_available_streams_num +=
s_get_available_streams_num_from_connection_set(&stream_manager->synced_data.nonideal_available_set);
out_metrics->pending_concurrency_acquires =
stream_manager->synced_data.internal_refcount_stats[AWS_SMCT_PENDING_ACQUISITION];
out_metrics->available_concurrency = all_available_streams_num;
s_unlock_synced_data((struct aws_http2_stream_manager *)(void *)stream_manager);
} /* END CRITICAL SECTION */
}
12 changes: 9 additions & 3 deletions source/random_access_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ int aws_random_access_set_remove(struct aws_random_access_set *set, const void *
return AWS_OP_SUCCESS;
}

int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void **out) {
int aws_random_access_set_random_get_ptr(const struct aws_random_access_set *set, void **out) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(out != NULL);
size_t length = aws_array_list_length(&set->impl->list);
Expand All @@ -166,11 +166,11 @@ int aws_random_access_set_random_get_ptr(struct aws_random_access_set *set, void
return aws_array_list_get_at(&set->impl->list, (void *)out, index);
}

size_t aws_random_access_set_get_size(struct aws_random_access_set *set) {
size_t aws_random_access_set_get_size(const struct aws_random_access_set *set) {
return aws_array_list_length(&set->impl->list);
}

int aws_random_access_set_exist(struct aws_random_access_set *set, const void *element, bool *exist) {
int aws_random_access_set_exist(const struct aws_random_access_set *set, const void *element, bool *exist) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(element);
AWS_PRECONDITION(exist);
Expand All @@ -179,3 +179,9 @@ int aws_random_access_set_exist(struct aws_random_access_set *set, const void *e
*exist = find != NULL;
return re;
}

int aws_random_access_set_random_get_ptr_index(const struct aws_random_access_set *set, void **out, size_t index) {
AWS_PRECONDITION(set);
AWS_PRECONDITION(out != NULL);
return aws_array_list_get_at(&set->impl->list, (void *)out, index);
}
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ add_net_test_case(h2_sm_mock_multiple_connections)
add_net_test_case(h2_sm_mock_bad_connection_acquired)
add_net_test_case(h2_sm_mock_connections_closed_before_request_made)
add_net_test_case(h2_sm_mock_max_concurrent_streams_remote)
add_net_test_case(h2_sm_mock_fetch_metric)
add_net_test_case(h2_sm_mock_complete_stream)
add_net_test_case(h2_sm_mock_ideal_num_streams)
add_net_test_case(h2_sm_mock_large_ideal_num_streams)
Expand Down
50 changes: 50 additions & 0 deletions tests/test_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,56 @@ TEST_CASE(h2_sm_mock_max_concurrent_streams_remote) {
return s_tester_clean_up();
}

/* Test that the remote max concurrent streams setting hit */
TEST_CASE(h2_sm_mock_fetch_metric) {
(void)ctx;
struct sm_tester_options options = {
.max_connections = 5,
.alloc = allocator,
};
ASSERT_SUCCESS(s_tester_init(&options));
s_override_cm_connect_function(s_aws_http_connection_manager_create_connection_sync_mock);
/* Set the remote max to be 2 */
s_tester.max_con_stream_remote = 2;
/* Acquire a stream to trigger */
ASSERT_SUCCESS(s_sm_stream_acquiring(1));
/* waiting for one fake connection made */
ASSERT_SUCCESS(s_wait_on_fake_connection_count(1));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(1));
struct aws_http_manager_metrics out_metrics;
AWS_ZERO_STRUCT(out_metrics);

aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
/* Acquired 1 stream, and we hold one connection, the max streams per connection is 2. */
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 1);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 0);

ASSERT_SUCCESS(s_sm_stream_acquiring(1));

ASSERT_SUCCESS(s_wait_on_fake_connection_count(1));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(2));
aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 0);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 0);

ASSERT_SUCCESS(s_sm_stream_acquiring(10));
ASSERT_SUCCESS(s_wait_on_fake_connection_count(5));
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_wait_on_streams_acquired_count(10));
aws_http2_stream_manager_fetch_metrics(s_tester.stream_manager, &out_metrics);
ASSERT_UINT_EQUALS(out_metrics.available_concurrency, 0);
ASSERT_UINT_EQUALS(out_metrics.pending_concurrency_acquires, 2);

ASSERT_SUCCESS(s_complete_all_fake_connection_streams());
/* Still have two more streams that have not been completed */
s_drain_all_fake_connection_testing_channel();
ASSERT_SUCCESS(s_complete_all_fake_connection_streams());

return s_tester_clean_up();
}

/* Test that the stream completed will free the connection for more streams */
TEST_CASE(h2_sm_mock_complete_stream) {
(void)ctx;
Expand Down

0 comments on commit 73af2aa

Please sign in to comment.