Skip to content

Commit

Permalink
Optimize headers encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 22, 2024
1 parent 1c818c0 commit cb556f0
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 47 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.5.3] - 2024-04-23

* Optimize headers encoding

## [0.5.2] - 2024-03-24

* Use ntex-net
Expand Down
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-h2"
version = "0.5.2"
version = "0.5.3"
license = "MIT OR Apache-2.0"
authors = ["Nikolay Kim <[email protected]>"]
description = "An HTTP/2 client and server"
Expand Down Expand Up @@ -28,12 +28,12 @@ ntex-service = "2.0"
ntex-util = "1.0.0"
ntex-rt = "0.4.11"

bitflags = "2.4"
bitflags = "2"
fxhash = "0.2.1"
log = "0.4"
pin-project-lite = "0.2"
thiserror = "1.0"
nanorand = { version = "0.7.0", default-features = false, features = ["std", "wyrand"] }
thiserror = "1"
nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] }

[dev-dependencies]
# Fuzzing
Expand Down
2 changes: 1 addition & 1 deletion src/client/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Inflight {
fn push(&mut self, item: Message) {
match self.response.take() {
Some(Either::Left(msg)) => {
let mut msgs = VecDeque::new();
let mut msgs = VecDeque::with_capacity(8);
msgs.push_back(msg);
msgs.push_back(item);
self.response = Some(Either::Right(msgs));
Expand Down
3 changes: 0 additions & 3 deletions src/codec/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ impl Decoder for LengthDelimitedCodec {
// Update the decode state
self.state.set(DecodeState::Head);

// Make sure the buffer has enough space to read the next head
src.reserve(self.builder.num_head_bytes());

Ok(Some(data))
}
None => Ok(None),
Expand Down
27 changes: 12 additions & 15 deletions src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,29 +290,26 @@ where
C: Service<Control<P::Error>, Response = ControlAck>,
C::Error: fmt::Debug,
{
let fut = ctx.call(&inner.publish, msg);
let mut pinned = std::pin::pin!(fut);
let result = poll_fn(|cx| {
if stream.is_remote() {
let result = if stream.is_remote() {
let fut = ctx.call(&inner.publish, msg);
let mut pinned = std::pin::pin!(fut);
poll_fn(|cx| {
match stream.poll_send_reset(cx) {
Poll::Ready(Ok(())) | Poll::Ready(Err(_)) => {
log::trace!("Stream is closed {:?}", stream.id());
return Poll::Ready(Ok(None));
return Poll::Ready(Ok(()));
}
Poll::Pending => (),
}
}

match pinned.as_mut().poll(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(None)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
})
.await;
pinned.as_mut().poll(cx)
})
.await
} else {
ctx.call(&inner.publish, msg).await
};

match result {
Ok(v) => Ok(v),
Ok(_) => Ok(None),
Err(e) => control(Control::app_error(e, stream), inner, ctx).await,
}
}
Expand Down
57 changes: 35 additions & 22 deletions src/frame/headers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{fmt, io::Cursor};
use std::{cell::RefCell, cmp, fmt, io::Cursor};

use ntex_bytes::{ByteString, BytesMut};
use ntex_http::{header, uri, HeaderMap, HeaderName, Method, StatusCode, Uri};
Expand Down Expand Up @@ -426,6 +426,10 @@ impl fmt::Debug for HeadersFlag {

// ===== HeaderBlock =====

thread_local! {
static HDRS_BUF: RefCell<BytesMut> = RefCell::new(BytesMut::with_capacity(1024));
}

impl HeaderBlock {
fn load(
&mut self,
Expand Down Expand Up @@ -542,28 +546,37 @@ impl HeaderBlock {
dst: &mut BytesMut,
max_size: usize,
) {
// encode hpack
let mut hpack = BytesMut::new();
let headers = Iter {
pseudo: Some(self.pseudo),
fields: self.fields.into_iter(),
};
encoder.encode(headers, &mut hpack);

let mut head = *head;
loop {
// encode the header payload
if hpack.len() > max_size {
Head::new(head.kind(), head.flag() ^ END_HEADERS, head.stream_id())
.encode(max_size, dst);
dst.extend_from_slice(&hpack.split_to(max_size));
head = Head::new(Kind::Continuation, END_HEADERS, head.stream_id());
} else {
head.encode(hpack.len(), dst);
dst.extend_from_slice(&hpack);
break;
HDRS_BUF.with(|buf| {
let mut b = buf.borrow_mut();
let hpack = &mut b;
hpack.clear();

// encode hpack
let headers = Iter {
pseudo: Some(self.pseudo),
fields: self.fields.into_iter(),
};
encoder.encode(headers, hpack);

let mut head = *head;
let mut start = 0;
loop {
let end = cmp::min(start + max_size, hpack.len());

// encode the header payload
if hpack.len() > end {
Head::new(head.kind(), head.flag() ^ END_HEADERS, head.stream_id())
.encode(max_size, dst);
dst.extend_from_slice(&hpack[start..end]);
head = Head::new(Kind::Continuation, END_HEADERS, head.stream_id());
start = end;
} else {
head.encode(end - start, dst);
dst.extend_from_slice(&hpack[start..end]);
break;
}
}
}
});
}

/// Calculates the size of the currently decoded header list.
Expand Down
2 changes: 0 additions & 2 deletions src/frame/stream_id.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::u32;

/// A stream identifier, as described in [Section 5.1.1] of RFC 7540.
///
/// Streams are identified with an unsigned 31-bit integer. Streams
Expand Down

0 comments on commit cb556f0

Please sign in to comment.