From 9bd58ca64bbcb6f793ddefd038045d52e0a1ecb4 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 3 Jan 2023 11:43:01 -0800 Subject: [PATCH] Websocket automatically responds to PING with PONG (#417) **Issue:** The Websocket was not automatically responding to a PING with a PONG, which violates [RFC-6455 Section 5.5.2](https://www.rfc-editor.org/rfc/rfc6455#section-5.5.2) **Description of changes:** Now it does Also, remove some things: - Remove concept of `high_priority` frames. - Nothing in the RFC mentions letting control frames cut in front of data frames. Remove this complexity until there's some proven need (spoiler: there will never be a need) - Remove public access to the RSV (reserved) bits. - These are only used by extensions, which we do not currently support. Keeping these accessible in the public API can only bite us later if we ever choose to support extensions. Better to keep these private unless there's a proven need to expose them. --- include/aws/http/websocket.h | 17 +--- source/websocket.c | 131 ++++++++++++++++++++-------- tests/CMakeLists.txt | 3 +- tests/test_websocket_handler.c | 152 ++++++++++++++++++--------------- 4 files changed, 183 insertions(+), 120 deletions(-) diff --git a/include/aws/http/websocket.h b/include/aws/http/websocket.h index 28ce317e0..6f85cafa8 100644 --- a/include/aws/http/websocket.h +++ b/include/aws/http/websocket.h @@ -12,7 +12,7 @@ struct aws_http_message; /* TODO: Document lifetime stuff */ /* TODO: Document CLOSE frame behavior (when auto-sent during close, when auto-closed) */ -/* TODO: Document auto-pong behavior */ +/* TODO: Accept payload as aws_input_stream */ /** * A websocket connection. @@ -82,7 +82,6 @@ struct aws_websocket_incoming_frame { uint64_t payload_length; uint8_t opcode; bool fin; - bool rsv[3]; }; /** @@ -309,7 +308,7 @@ typedef void( /** * Options for sending a websocket frame. * This structure is copied immediately by aws_websocket_send(). - * For descriptions of opcode, fin, rsv, and payload_length see in RFC-6455 Section 5.2. + * For descriptions of opcode, fin, and payload_length see in RFC-6455 Section 5.2. */ struct aws_websocket_send_frame_options { /** @@ -346,18 +345,6 @@ struct aws_websocket_send_frame_options { * Indicates that this is the final fragment in a message. The first fragment MAY also be the final fragment. */ bool fin; - - /** - * If true, frame will be sent before those with normal priority. - * Useful for opcodes like PING and PONG where low latency is important. - * This feature may only be used with "control" opcodes, not "data" opcodes like BINARY and TEXT. - */ - bool high_priority; - - /** - * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. - */ - bool rsv[3]; }; AWS_EXTERN_C_BEGIN diff --git a/source/websocket.c b/source/websocket.c index 8dfcff90a..8024d3988 100644 --- a/source/websocket.c +++ b/source/websocket.c @@ -22,8 +22,6 @@ # pragma warning(disable : 4204) /* non-constant aggregate initializer */ #endif -/* TODO: echo payload of peer CLOSE */ - /* TODO: If something goes wrong during normal shutdown, do I change the error_code? */ struct outgoing_frame { @@ -71,6 +69,10 @@ struct aws_websocket { struct aws_websocket_incoming_frame *current_incoming_frame; struct aws_websocket_incoming_frame incoming_frame_storage; + /* Payload of incoming PING frame. + * The PONG frame we send in response must have an identical payload */ + struct aws_byte_buf incoming_ping_payload; + /* If current incoming frame is CONTINUATION, this is the data type it is a continuation of. */ enum aws_websocket_opcode continuation_of_opcode; @@ -298,6 +300,7 @@ struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handl aws_linked_list_init(&websocket->thread_data.outgoing_frame_list); aws_linked_list_init(&websocket->thread_data.write_completion_frames); + aws_byte_buf_init(&websocket->thread_data.incoming_ping_payload, websocket->alloc, 0); aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket); @@ -343,6 +346,7 @@ static void s_handler_destroy(struct aws_channel_handler *handler) { AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket); + aws_byte_buf_clean_up(&websocket->thread_data.incoming_ping_payload); aws_mutex_clean_up(&websocket->synced_data.lock); aws_mem_release(websocket->alloc, websocket); } @@ -415,22 +419,6 @@ int aws_websocket_convert_to_midchannel_handler(struct aws_websocket *websocket) return AWS_OP_SUCCESS; } -/* Insert frame into list, sorting by priority, then by age (high-priority and older frames towards the front) */ -static void s_enqueue_prioritized_frame(struct aws_linked_list *list, struct outgoing_frame *to_add) { - /* Iterate in reverse so that common case (a bunch of low-priority frames) is O(1) */ - struct aws_linked_list_node *rev_iter = aws_linked_list_rbegin(list); - const struct aws_linked_list_node *rev_end = aws_linked_list_rend(list); - while (rev_iter != rev_end) { - struct outgoing_frame *frame_i = AWS_CONTAINER_OF(rev_iter, struct outgoing_frame, node); - if (to_add->def.high_priority == frame_i->def.high_priority) { - break; - } - rev_iter = aws_linked_list_prev(rev_iter); - } - - aws_linked_list_insert_after(rev_iter, &to_add->node); -} - static int s_send_frame( struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options, @@ -440,10 +428,6 @@ static int s_send_frame( AWS_ASSERT(options); /* Check for bad input. Log about non-obvious errors. */ - if (options->high_priority && aws_websocket_is_data_frame(options->opcode)) { - AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Data frames cannot be sent as high-priority.", (void *)websocket); - return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - } if (options->payload_length > 0 && !options->stream_outgoing_payload) { AWS_LOGF_ERROR( AWS_LS_HTTP_WEBSOCKET, @@ -495,13 +479,12 @@ static int s_send_frame( AWS_LOGF_DEBUG( AWS_LS_HTTP_WEBSOCKET, - "id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s priority=%s", + "id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s", (void *)websocket, options->opcode, aws_websocket_opcode_str(options->opcode), options->payload_length, - options->fin ? "T" : "F", - options->high_priority ? "high" : "normal"); + options->fin ? "T" : "F"); if (should_schedule_task) { AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling synced data task.", (void *)websocket); @@ -536,12 +519,7 @@ static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, voi /* END CRITICAL SECTION */ if (!aws_linked_list_empty(&tmp_list)) { - do { - struct aws_linked_list_node *node = aws_linked_list_pop_front(&tmp_list); - struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node); - s_enqueue_prioritized_frame(&websocket->thread_data.outgoing_frame_list, frame); - } while (!aws_linked_list_empty(&tmp_list)); - + aws_linked_list_move_all_back(&websocket->thread_data.outgoing_frame_list, &tmp_list); s_try_write_outgoing_frames(websocket); } } @@ -1313,9 +1291,6 @@ static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *use websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length; websocket->thread_data.current_incoming_frame->opcode = frame->opcode; websocket->thread_data.current_incoming_frame->fin = frame->fin; - websocket->thread_data.current_incoming_frame->rsv[0] = frame->rsv[0]; - websocket->thread_data.current_incoming_frame->rsv[1] = frame->rsv[1]; - websocket->thread_data.current_incoming_frame->rsv[2] = frame->rsv[2]; /* If CONTINUATION frames are expected, remember which type of data is being continued. * RFC-6455 Section 5.4 Fragmentation */ @@ -1327,6 +1302,15 @@ static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *use websocket->thread_data.continuation_of_opcode = frame->opcode; } } + } else if (frame->opcode == AWS_WEBSOCKET_OPCODE_PING) { + /* Prepare to store payload of PING so we can echo it back in the PONG */ + aws_byte_buf_reset(&websocket->thread_data.incoming_ping_payload, false /*zero_contents*/); + /* Note: we are NOT calling aws_byte_buf_reserve(). + * This works around an attack where a malicious peer CLAIMS they'll send a huge frame, + * which would case OOM if we did the reserve immediately. + * If a malicious peer wants to run us out of memory, they'll need to do + * it the costly way and actually send a billion bytes. + * Or we could impose our own internal limits, but for now this is simpler */ } /* Invoke user cb */ @@ -1351,6 +1335,11 @@ static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) { AWS_ASSERT(websocket->thread_data.current_incoming_frame); AWS_ASSERT(!websocket->thread_data.is_reading_stopped); + /* Store payload of PING so we can echo it back in the PONG */ + if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) { + aws_byte_buf_append_dynamic(&websocket->thread_data.incoming_ping_payload, &data); + } + if (websocket->thread_data.is_midchannel_handler) { return s_decoder_on_midchannel_payload(websocket, data); } @@ -1455,11 +1444,57 @@ static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, stru return AWS_OP_ERR; } +/* When the websocket sends a frame automatically (PONG, CLOSE), + * this holds the payload. */ +struct aws_websocket_autopayload { + struct aws_allocator *alloc; + struct aws_byte_buf buf; + struct aws_byte_cursor advancing_cursor; +}; + +static struct aws_websocket_autopayload *s_autopayload_new( + struct aws_allocator *alloc, + const struct aws_byte_buf *src) { + + struct aws_websocket_autopayload *autopayload = aws_mem_calloc(alloc, 1, sizeof(struct aws_websocket_autopayload)); + autopayload->alloc = alloc; + if (src->len > 0) { + aws_byte_buf_init_copy(&autopayload->buf, alloc, src); + autopayload->advancing_cursor = aws_byte_cursor_from_buf(&autopayload->buf); + } + + return autopayload; +} + +static void s_autopayload_destroy(struct aws_websocket_autopayload *autopayload) { + aws_byte_buf_clean_up(&autopayload->buf); + aws_mem_release(autopayload->alloc, autopayload); +} + +static void s_autopayload_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) { + (void)websocket; + (void)error_code; + + struct aws_websocket_autopayload *autopayload = user_data; + s_autopayload_destroy(autopayload); +} + +static bool s_autopayload_stream_outgoing_payload( + struct aws_websocket *websocket, + struct aws_byte_buf *out_buf, + void *user_data) { + + (void)websocket; + struct aws_websocket_autopayload *autopayload = user_data; + aws_byte_buf_write_to_capacity(out_buf, &autopayload->advancing_cursor); + return true; +} + static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result) { AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel)); AWS_ASSERT(websocket->thread_data.current_incoming_frame); - if (error_code == AWS_OP_SUCCESS) { + if (error_code == 0) { /* If this was a CLOSE frame, don't read any more data. */ if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) { AWS_LOGF_DEBUG( @@ -1469,9 +1504,31 @@ static void s_complete_incoming_frame(struct aws_websocket *websocket, int error websocket->thread_data.is_reading_stopped = true; /* TODO: auto-close if there's a channel-handler to the right */ - } - /* TODO: auto-respond to PING with PONG */ + } else if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) { + /* Automatically respond to a PING with a PONG */ + if (!websocket->thread_data.is_writing_stopped) { + /* Optimization idea: avoid allocations/copies each time we send an auto-PONG. + * Maybe have a small autopayload pool, instead of allocating one each time. + * Maybe encode directly to aws_io_message, instead of copying to a buf, that's copied to a msg later. + * Maybe "std::move()" the aws_byte_bufs around instead of copying them. */ + struct aws_websocket_autopayload *autopong = + s_autopayload_new(websocket->alloc, &websocket->thread_data.incoming_ping_payload); + + struct aws_websocket_send_frame_options pong_frame = { + .opcode = AWS_WEBSOCKET_OPCODE_PONG, + .fin = true, + .payload_length = autopong->buf.len, + .stream_outgoing_payload = s_autopayload_stream_outgoing_payload, + .on_complete = s_autopayload_send_complete, + .user_data = autopong, + }; + + int send_err = s_send_frame(websocket, &pong_frame, false /*from_public_api*/); + /* Failure should be impossible. We already checked that writing is not stopped */ + AWS_FATAL_ASSERT(!send_err && "Unexpected failure sending websocket PONG"); + } + } } /* Invoke user cb */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9f844c7f2..90ce5f970 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -188,7 +188,6 @@ add_test_case(websocket_handler_send_multiple_frames) add_test_case(websocket_handler_send_huge_frame) add_test_case(websocket_handler_send_payload_slowly) add_test_case(websocket_handler_send_payload_with_pauses) -add_test_case(websocket_handler_send_high_priority_frame) add_test_case(websocket_handler_sends_nothing_after_close_frame) add_test_case(websocket_handler_send_frames_always_complete) add_test_case(websocket_handler_send_one_io_msg_at_a_time) @@ -209,6 +208,8 @@ add_test_case(websocket_handler_read_halts_if_payload_fn_returns_false) add_test_case(websocket_handler_read_halts_if_complete_fn_returns_false) add_test_case(websocket_handler_window_manual_increment) add_test_case(websocket_handler_window_manual_increment_off_thread) +add_test_case(websocket_handler_sends_pong_automatically) +add_test_case(websocket_handler_wont_send_pong_after_close_frame) add_test_case(websocket_midchannel_sanity_check) add_test_case(websocket_midchannel_write_message) add_test_case(websocket_midchannel_write_multiple_messages) diff --git a/tests/test_websocket_handler.c b/tests/test_websocket_handler.c index e3361bc53..c1643e81e 100644 --- a/tests/test_websocket_handler.c +++ b/tests/test_websocket_handler.c @@ -377,9 +377,6 @@ static int s_readpush_check(struct tester *tester, size_t frame_i, int expected_ ASSERT_UINT_EQUALS(pushed->def.payload_length, received->def.payload_length); ASSERT_UINT_EQUALS(pushed->def.opcode, received->def.opcode); ASSERT_INT_EQUALS(pushed->def.fin, received->def.fin); - ASSERT_INT_EQUALS(pushed->def.rsv[0], received->def.rsv[0]); - ASSERT_INT_EQUALS(pushed->def.rsv[1], received->def.rsv[1]); - ASSERT_INT_EQUALS(pushed->def.rsv[2], received->def.rsv[2]); if (received->on_complete_error_code == AWS_ERROR_SUCCESS) { ASSERT_UINT_EQUALS(received->def.payload_length, received->payload.len); @@ -621,9 +618,6 @@ static int s_check_written_message(struct send_tester *send, size_t expected_ord ASSERT_UINT_EQUALS(send->def.opcode, written->def.opcode); ASSERT_UINT_EQUALS(send->def.payload_length, written->def.payload_length); ASSERT_INT_EQUALS(send->def.fin, written->def.fin); - for (int i = 0; i < 3; i++) { - ASSERT_INT_EQUALS(send->def.rsv[i], written->def.rsv[i]); - } /* All payloads sent from client should have been masked (assuming client is being tested here) */ ASSERT_TRUE(written->def.masked); @@ -904,67 +898,6 @@ TEST_CASE(websocket_handler_send_payload_with_pauses) { return AWS_OP_SUCCESS; } -TEST_CASE(websocket_handler_send_high_priority_frame) { - (void)ctx; - struct tester tester; - ASSERT_SUCCESS(s_tester_init(&tester, allocator)); - - struct send_tester sending[] = { - { - .payload = aws_byte_cursor_from_c_str("A"), - .def = - { - .opcode = AWS_WEBSOCKET_OPCODE_TEXT, - .fin = false, - }, - }, - { - .def = - { - .opcode = AWS_WEBSOCKET_OPCODE_PING, - .fin = true, - .high_priority = true, - }, - }, - { - .payload = aws_byte_cursor_from_c_str("C"), - .def = - { - .opcode = AWS_WEBSOCKET_OPCODE_CONTINUATION, - .fin = true, - }, - }, - { - .def = - { - .opcode = AWS_WEBSOCKET_OPCODE_PONG, - .fin = true, - .high_priority = true, - }, - }, - }; - - /* Send from user-thread to ensure that everything is queued. - * When queued frames are processed, the high-priority one should end up first. */ - testing_channel_set_is_on_users_thread(&tester.testing_channel, false); - - for (size_t i = 0; i < AWS_ARRAY_SIZE(sending); ++i) { - ASSERT_SUCCESS(s_send_frame(&tester, &sending[i])); - } - - testing_channel_set_is_on_users_thread(&tester.testing_channel, true); - ASSERT_SUCCESS(s_drain_written_messages(&tester)); - - /* High-priority frames (index 1 and 3) should get sent first */ - ASSERT_SUCCESS(s_check_written_message(&sending[1], 0)); - ASSERT_SUCCESS(s_check_written_message(&sending[3], 1)); - ASSERT_SUCCESS(s_check_written_message(&sending[0], 2)); - ASSERT_SUCCESS(s_check_written_message(&sending[2], 3)); - - ASSERT_SUCCESS(s_tester_clean_up(&tester)); - return AWS_OP_SUCCESS; -} - TEST_CASE(websocket_handler_sends_nothing_after_close_frame) { (void)ctx; struct tester tester; @@ -1861,6 +1794,91 @@ TEST_CASE(websocket_midchannel_write_huge_message) { return AWS_OP_SUCCESS; } +TEST_CASE(websocket_handler_sends_pong_automatically) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* Read PING with a payload */ + struct readpush_frame read_ping_with_payload = { + .def = + { + .opcode = AWS_WEBSOCKET_OPCODE_PING, + .fin = true, + }, + .payload = aws_byte_cursor_from_c_str("echo me pls"), + }; + s_set_readpush_frames(&tester, &read_ping_with_payload, 1); + ASSERT_SUCCESS(s_do_readpush_all(&tester)); + + /* Check that PONG is automatically written, with payload echoing the PING */ + s_drain_written_messages(&tester); + const struct written_frame *written_frame = &tester.written_frames[0]; + ASSERT_UINT_EQUALS(AWS_WEBSOCKET_OPCODE_PONG, written_frame->def.opcode); + ASSERT_TRUE(written_frame->is_complete); + ASSERT_BIN_ARRAYS_EQUALS( + read_ping_with_payload.payload.ptr, + read_ping_with_payload.payload.len, + written_frame->payload.buffer, + written_frame->payload.len); + + /* Read PING without empty payload */ + struct readpush_frame read_ping_with_empty_payload = { + .def = + { + .opcode = AWS_WEBSOCKET_OPCODE_PING, + .fin = true, + }, + }; + s_set_readpush_frames(&tester, &read_ping_with_empty_payload, 1); + ASSERT_SUCCESS(s_do_readpush_all(&tester)); + + /* Check that PONG with empty payload is automatically written */ + s_drain_written_messages(&tester); + written_frame = &tester.written_frames[1]; + ASSERT_UINT_EQUALS(AWS_WEBSOCKET_OPCODE_PONG, written_frame->def.opcode); + ASSERT_TRUE(written_frame->is_complete); + ASSERT_UINT_EQUALS(0, written_frame->payload.len); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +TEST_CASE(websocket_handler_wont_send_pong_after_close_frame) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* Send a CLOSE frame */ + struct send_tester send_close = { + .def = + { + .opcode = AWS_WEBSOCKET_OPCODE_CLOSE, + .fin = true, + }, + }; + ASSERT_SUCCESS(s_send_frame(&tester, &send_close)); + + /* Now have the websocket read a PING */ + struct readpush_frame read_ping = { + .def = + { + .opcode = AWS_WEBSOCKET_OPCODE_PING, + .fin = true, + }, + }; + s_set_readpush_frames(&tester, &read_ping, 1); + ASSERT_SUCCESS(s_do_readpush_all(&tester)); + + /* Check that PONG is NOT sent automatically, because a CLOSE was sent before it */ + s_drain_written_messages(&tester); + ASSERT_TRUE(tester.num_written_frames == 1); + ASSERT_INT_EQUALS(AWS_WEBSOCKET_OPCODE_CLOSE, tester.written_frames[0].def.opcode); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + TEST_CASE(websocket_midchannel_read_message) { (void)ctx; struct tester tester;