Skip to content

Commit

Permalink
Handle connection close corner case (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Mar 21, 2023
1 parent 291b06c commit 0600662
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 31 deletions.
54 changes: 54 additions & 0 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,33 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
}
}

if (error_code != AWS_ERROR_SUCCESS) {
if (stream->base.client_data && stream->is_incoming_message_done) {
/* As a request that finished receiving the response, we ignore error and
* consider it finished successfully */
AWS_LOGF_DEBUG(
AWS_LS_HTTP_STREAM,
"id=%p: Ignoring error code %d (%s). The response has been fully received,"
"so the stream will complete successfully.",
(void *)&stream->base,
error_code,
aws_error_name(error_code));
error_code = AWS_ERROR_SUCCESS;
}
if (stream->base.server_data && stream->is_outgoing_message_done) {
/* As a server finished sending the response, but still failed with the request was not finished receiving.
* We ignore error and consider it finished successfully */
AWS_LOGF_DEBUG(
AWS_LS_HTTP_STREAM,
"id=%p: Ignoring error code %d (%s). The response has been fully sent,"
" so the stream will complete successfully",
(void *)&stream->base,
error_code,
aws_error_name(error_code));
error_code = AWS_ERROR_SUCCESS;
}
}

/* Remove stream from list. */
aws_linked_list_remove(&stream->node);

Expand Down Expand Up @@ -1066,6 +1093,33 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
aws_h1_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */

if (connection->base.client_data) {
/**
* RFC-9112 section 9.6.
* A client that receives a "close" connection option MUST cease sending
* requests on that connection and close the connection after reading the
* response message containing the "close" connection option.
*
* Mark the stream's outgoing message as complete,
* so that we stop sending, and stop waiting for it to finish sending.
**/
if (!incoming_stream->is_outgoing_message_done) {
AWS_LOGF_DEBUG(
AWS_LS_HTTP_STREAM,
"id=%p: Received 'Connection: close' header, no more request data will be sent.",
(void *)&incoming_stream->base);
incoming_stream->is_outgoing_message_done = true;
}
/* Stop writing right now.
* Shutdown will be scheduled after we finishing parsing the response */
s_stop(
connection,
false /*stop_reading*/,
true /*stop_writing*/,
false /*schedule_shutdown*/,
AWS_ERROR_SUCCESS);
}
}
}

Expand Down
36 changes: 22 additions & 14 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ add_test_case(h1_client_midchannel_requires_switching_protocols)
add_test_case(h1_client_switching_protocols_fails_pending_requests)
add_test_case(h1_client_switching_protocols_fails_subsequent_requests)
add_test_case(h1_client_switching_protocols_requires_downstream_handler)
add_test_case(h1_client_connection_close_before_request_finishes)
add_test_case(h1_client_response_close_connection_before_request_finishes)

add_test_case(strutil_trim_http_whitespace)
add_test_case(strutil_is_http_token)
Expand Down Expand Up @@ -199,6 +201,7 @@ add_test_case(websocket_handler_delayed_write_completion)
add_test_case(websocket_handler_send_halts_if_payload_fn_returns_false)
add_test_case(websocket_handler_shutdown_automatically_sends_close_frame)
add_test_case(websocket_handler_shutdown_handles_queued_close_frame)

# add_test_case(websocket_handler_shutdown_immediately_in_emergency) disabled until channel API exposes immediate shutdown
add_test_case(websocket_handler_shutdown_handles_unexpected_write_error)
add_test_case(websocket_handler_close_on_thread)
Expand Down Expand Up @@ -268,7 +271,7 @@ add_test_case(hpack_dynamic_table_empty_value)
add_test_case(hpack_dynamic_table_with_empty_header)
add_test_case(hpack_dynamic_table_size_update_from_setting)

if (ENABLE_LOCALHOST_INTEGRATION_TESTS)
if(ENABLE_LOCALHOST_INTEGRATION_TESTS)
# Tests should be named with localhost_integ_*
add_net_test_case(localhost_integ_hpack_stress)
add_net_test_case(localhost_integ_hpack_compression_stress)
Expand Down Expand Up @@ -401,7 +404,8 @@ add_test_case(h2_client_unactivated_stream_cleans_up)
add_test_case(h2_client_connection_preface_sent)
add_test_case(h2_client_auto_ping_ack)
add_test_case(h2_client_auto_ping_ack_higher_priority)
#TODO add_test_case(h2_client_auto_ping_ack_higher_priority_not_break_encoding_frame)

# TODO add_test_case(h2_client_auto_ping_ack_higher_priority_not_break_encoding_frame)
add_test_case(h2_client_auto_settings_ack)
add_test_case(h2_client_stream_complete)
add_test_case(h2_client_close)
Expand Down Expand Up @@ -444,7 +448,8 @@ add_test_case(h2_client_push_promise_automatically_rejected)
add_test_case(h2_client_conn_receive_goaway)
add_test_case(h2_client_conn_receive_goaway_debug_data)
add_test_case(h2_client_conn_err_invalid_last_stream_id_goaway)
#TODO add_test_case(h2_client_send_goaway_with_push_promises) id of 1st should be in GOAWAY 2nd should be ignored

# TODO add_test_case(h2_client_send_goaway_with_push_promises) id of 1st should be in GOAWAY 2nd should be ignored
add_test_case(h2_client_change_settings_succeed)
add_test_case(h2_client_change_settings_failed_no_ack_received)
add_test_case(h2_client_manual_window_management_disabled_auto_window_update)
Expand All @@ -454,10 +459,11 @@ add_test_case(h2_client_manual_window_management_user_send_stream_window_update_
add_test_case(h2_client_manual_window_management_user_send_conn_window_update)
add_test_case(h2_client_manual_window_management_user_send_conn_window_update_with_padding)
add_test_case(h2_client_manual_window_management_user_send_connection_window_update_overflow)

# Build these when we address window_update() differences in H1 vs H2
#TODO add_test_case(h2_client_manual_updated_window_ignored_when_automatical_on)
#TODO add_test_case(h2_client_manual_stream_updated_window_ignored_invalid_state)
#TODO add_test_case(h2_client_manual_window_management_window_overflow) #we cannot ensure the increment_size is safe or not, let our peer detect the maximum exceed or not. But we can test the obviously overflows here.
# TODO add_test_case(h2_client_manual_updated_window_ignored_when_automatical_on)
# TODO add_test_case(h2_client_manual_stream_updated_window_ignored_invalid_state)
# TODO add_test_case(h2_client_manual_window_management_window_overflow) #we cannot ensure the increment_size is safe or not, let our peer detect the maximum exceed or not. But we can test the obviously overflows here.
add_test_case(h2_client_send_ping_successfully_receive_ack)
add_test_case(h2_client_send_ping_no_ack_received)
add_test_case(h2_client_conn_err_extraneous_ping_ack_received)
Expand Down Expand Up @@ -496,10 +502,11 @@ add_test_case(connection_h2_prior_knowledge)
add_test_case(connection_h2_prior_knowledge_not_work_with_tls)
add_test_case(connection_customized_alpn)
add_test_case(connection_customized_alpn_error_with_unknown_return_string)

# These server tests occasionally fail. Resurrect if/when we get back to work on HTTP server.
#add_test_case(connection_destroy_server_with_connection_existing)
#add_test_case(connection_destroy_server_with_multiple_connections_existing)
#add_test_case(connection_server_shutting_down_new_connection_setup_fail)
# add_test_case(connection_destroy_server_with_connection_existing)
# add_test_case(connection_destroy_server_with_multiple_connections_existing)
# add_test_case(connection_server_shutting_down_new_connection_setup_fail)

# connection manager tests
# unit tests where connections are mocked
Expand All @@ -526,7 +533,7 @@ add_net_test_case(test_connection_manager_acquire_release_mix)

# Integration test that requires proxy envrionment in us-east-1 region.
# TODO: test the server name validation properly
if (ENABLE_PROXY_INTEGRATION_TESTS)
if(ENABLE_PROXY_INTEGRATION_TESTS)
add_net_test_case(connection_manager_proxy_integration_forwarding_proxy_no_auth)
add_net_test_case(connection_manager_proxy_integration_forwarding_proxy_no_auth_env)
add_net_test_case(connection_manager_proxy_integration_legacy_http_no_auth)
Expand Down Expand Up @@ -646,8 +653,9 @@ add_net_test_case(h2_sm_acquire_stream)
add_net_test_case(h2_sm_acquire_stream_multiple_connections)
add_net_test_case(h2_sm_closing_before_connection_acquired)
add_net_test_case(h2_sm_close_connection_on_server_error)

# Tests against local server
if (ENABLE_LOCALHOST_INTEGRATION_TESTS)
if(ENABLE_LOCALHOST_INTEGRATION_TESTS)
# Tests should be named with localhost_integ_*
add_net_test_case(localhost_integ_h2_sm_prior_knowledge)
add_net_test_case(localhost_integ_h2_sm_acquire_stream_stress)
Expand All @@ -669,7 +677,7 @@ generate_test_driver(${TEST_BINARY_NAME})
file(GLOB FUZZ_TESTS "fuzz/*.c")
aws_add_fuzz_tests("${FUZZ_TESTS}" "" "")

#SSL certificates to use for testing.
# SSL certificates to use for testing.
add_custom_command(TARGET ${TEST_BINARY_NAME} PRE_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
${CMAKE_CURRENT_SOURCE_DIR}/resources $<TARGET_FILE_DIR:${TEST_BINARY_NAME}>)
COMMAND ${CMAKE_COMMAND} -E copy_directory
${CMAKE_CURRENT_SOURCE_DIR}/resources $<TARGET_FILE_DIR:${TEST_BINARY_NAME}>)
161 changes: 144 additions & 17 deletions tests/test_h1_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -2260,30 +2260,33 @@ static struct aws_input_stream_vtable s_slow_stream_vtable = {
.get_length = s_slow_stream_get_length,
};

static void s_slow_body_sender_init(struct slow_body_sender *body_sender) {
/* set up request whose body won't send immediately */
struct aws_input_stream empty_stream_base;
AWS_ZERO_STRUCT(empty_stream_base);
body_sender->base = empty_stream_base;
body_sender->status.is_end_of_stream = false;
body_sender->status.is_valid = true;
struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests");
body_sender->cursor = body;
body_sender->delay_ticks = 5;
body_sender->bytes_per_tick = 1;

body_sender->base.vtable = &s_slow_stream_vtable;
aws_ref_count_init(
&body_sender->base.ref_count, &body_sender, (aws_simple_completion_callback *)s_slow_stream_destroy);
}

/* It should be fine to receive a response before the request has finished sending */
H1_CLIENT_TEST_CASE(h1_client_response_arrives_before_request_done_sending_is_ok) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* set up request whose body won't send immediately */
struct aws_input_stream empty_stream_base;
AWS_ZERO_STRUCT(empty_stream_base);
struct slow_body_sender body_sender = {
.base = empty_stream_base,
.status =
{
.is_end_of_stream = false,
.is_valid = true,
},
.cursor = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests"),
.delay_ticks = 5,
.bytes_per_tick = 1,
};
body_sender.base.vtable = &s_slow_stream_vtable;
aws_ref_count_init(
&body_sender.base.ref_count, &body_sender, (aws_simple_completion_callback *)s_slow_stream_destroy);

struct slow_body_sender body_sender;
AWS_ZERO_STRUCT(body_sender);
s_slow_body_sender_init(&body_sender);
struct aws_input_stream *body_stream = &body_sender.base;

struct aws_http_header headers[] = {
Expand Down Expand Up @@ -4156,3 +4159,127 @@ H1_CLIENT_TEST_CASE(h1_client_switching_protocols_requires_downstream_handler) {
ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

H1_CLIENT_TEST_CASE(h1_client_connection_close_before_request_finishes) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* set up request whose body won't send immediately */
struct slow_body_sender body_sender;
AWS_ZERO_STRUCT(body_sender);
s_slow_body_sender_init(&body_sender);
struct aws_input_stream *body_stream = &body_sender.base;

struct aws_http_header headers[] = {
{
.name = aws_byte_cursor_from_c_str("Content-Length"),
.value = aws_byte_cursor_from_c_str("16"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
ASSERT_SUCCESS(aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)));
aws_http_message_set_body_stream(request, body_stream);

struct client_stream_tester stream_tester;
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request));

/* send head of request */
testing_channel_run_currently_queued_tasks(&tester.testing_channel);

/* Ensure the request can be destroyed after request is sent */
aws_http_message_destroy(request);
aws_input_stream_release(body_stream);

/* send close connection response */
ASSERT_SUCCESS(testing_channel_push_read_str(
&tester.testing_channel,
"HTTP/1.1 404 Not Found\r\n"
"Date: Fri, 01 Mar 2019 17:18:55 GMT\r\n"
"\r\n"));

testing_channel_run_currently_queued_tasks(&tester.testing_channel);

aws_channel_shutdown(tester.testing_channel.channel, AWS_ERROR_SUCCESS);
/* Wait for channel to finish shutdown */
testing_channel_drain_queued_tasks(&tester.testing_channel);
/* check result, should not receive any body */
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
"Content-Length: 16\r\n"
"\r\n";
ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected));

ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);

/* clean up */
client_stream_tester_clean_up(&stream_tester);
ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

/* When response has `connection: close` any further request body should not be sent. */
H1_CLIENT_TEST_CASE(h1_client_response_close_connection_before_request_finishes) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* set up request whose body won't send immediately */
struct slow_body_sender body_sender;
AWS_ZERO_STRUCT(body_sender);
s_slow_body_sender_init(&body_sender);
struct aws_input_stream *body_stream = &body_sender.base;

struct aws_http_header headers[] = {
{
.name = aws_byte_cursor_from_c_str("Content-Length"),
.value = aws_byte_cursor_from_c_str("16"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
ASSERT_SUCCESS(aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)));
aws_http_message_set_body_stream(request, body_stream);

struct client_stream_tester stream_tester;
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, &tester, request));

/* send head of request */
testing_channel_run_currently_queued_tasks(&tester.testing_channel);

/* Ensure the request can be destroyed after request is sent */
aws_http_message_destroy(request);
aws_input_stream_release(body_stream);

/* send close connection response */
ASSERT_SUCCESS(testing_channel_push_read_str(
&tester.testing_channel,
"HTTP/1.1 404 Not Found\r\n"
"Date: Fri, 01 Mar 2019 17:18:55 GMT\r\n"
"Connection: close\r\n"
"\r\n"));

testing_channel_drain_queued_tasks(&tester.testing_channel);
/* check result, should not receive any body */
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
"Content-Length: 16\r\n"
"\r\n";
ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected));
/* Check if the testing channel has shut down. */
ASSERT_TRUE(testing_channel_is_shutdown_completed(&tester.testing_channel));

ASSERT_TRUE(stream_tester.complete);
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);

/* clean up */
client_stream_tester_clean_up(&stream_tester);
ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

0 comments on commit 0600662

Please sign in to comment.