Skip to content

Commit

Permalink
Add frame read rate support (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Apr 23, 2024
1 parent 7e22358 commit dd175b1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 44 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [0.5.3] - 2024-04-23

* Add frame read rate support

* Limit "max headers size" to 48kb

* Do not decode partial headers frame
Expand Down
24 changes: 19 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub(crate) struct ConfigInner {
// pub extended_connect_protocol_enabled: bool,
/// Connection timeouts
pub(crate) handshake_timeout: Cell<Seconds>,
pub(crate) client_timeout: Cell<Seconds>,
pub(crate) ping_timeout: Cell<Seconds>,
pub(crate) dispatcher_config: DispatcherConfig,

Expand Down Expand Up @@ -78,7 +77,8 @@ impl Config {
let dispatcher_config = DispatcherConfig::default();
dispatcher_config
.set_keepalive_timeout(Seconds(0))
.set_disconnect_timeout(Seconds(3));
.set_disconnect_timeout(Seconds(3))
.set_frame_read_rate(Seconds(1), Seconds::ZERO, 256);

Config(Rc::new(ConfigInner {
flags,
Expand All @@ -91,7 +91,6 @@ impl Config {
reset_max: Cell::new(consts::DEFAULT_RESET_STREAM_MAX),
reset_duration: Cell::new(consts::DEFAULT_RESET_STREAM_SECS.into()),
remote_max_concurrent_streams: Cell::new(None),
client_timeout: Cell::new(Seconds(0)),
handshake_timeout: Cell::new(Seconds(5)),
ping_timeout: Cell::new(Seconds(10)),
pool: pool::new(),
Expand Down Expand Up @@ -276,6 +275,8 @@ impl Config {
self
}

#[deprecated()]
#[doc(hidden)]
/// Set server client timeout for first request.
///
/// Defines a timeout for reading client request header. If a client does not transmit
Expand All @@ -285,8 +286,21 @@ impl Config {
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 3 seconds.
pub fn client_timeout(&self, timeout: Seconds) -> &Self {
self.0.client_timeout.set(timeout);
pub fn client_timeout(&self, _: Seconds) -> &Self {
self
}

/// Set read rate parameters for single frame.
///
/// Set read timeout, max timeout and rate for reading payload. If the client
/// sends `rate` amount of data within `timeout` period of time, extend timeout by `timeout` seconds.
/// But no more than `max_timeout` timeout.
///
/// By default frame read rate is 256 bytes every seconds with no max timeout.
pub fn frame_read_rate(self, timeout: Seconds, max_timeout: Seconds, rate: u16) -> Self {
self.0
.dispatcher_config
.set_frame_read_rate(timeout, max_timeout, rate);
self
}

Expand Down
9 changes: 0 additions & 9 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ bitflags::bitflags! {
pub(crate) struct ConnectionFlags: u8 {
const SETTINGS_PROCESSED = 0b0000_0001;
const DELAY_DROP_TASK_STARTED = 0b0000_0010;
const SLOW_REQUEST_TIMEOUT = 0b0000_0100;
const DISCONNECT_WHEN_READY = 0b0000_1000;
const SECURE = 0b0001_0000;
const STREAM_REFUSED = 0b0010_0000;
Expand Down Expand Up @@ -392,12 +391,6 @@ impl RecvHalfConnection {
self.0.flags.set(flags);
}

fn unset_flags(&self, f: ConnectionFlags) {
let mut flags = self.0.flags.get();
flags.remove(f);
self.0.flags.set(flags);
}

pub(crate) fn encode<T>(&self, item: T)
where
frame::Frame: From<T>,
Expand All @@ -412,8 +405,6 @@ impl RecvHalfConnection {
let id = frm.stream_id();

if self.0.local_config.is_server() {
self.unset_flags(ConnectionFlags::SLOW_REQUEST_TIMEOUT);

if !id.is_client_initiated() {
return Err(Either::Left(ConnectionError::InvalidStreamId(
"Invalid id in received headers frame",
Expand Down
36 changes: 6 additions & 30 deletions src/server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{fmt, rc::Rc};

use ntex_io::{Dispatcher as IoDispatcher, Filter, Io, IoBoxed};
use ntex_service::{Service, ServiceCtx, ServiceFactory};
use ntex_util::time::{sleep, timeout_checked};
use ntex_util::time::timeout_checked;

use crate::connection::{Connection, ConnectionFlags};
use crate::connection::Connection;
use crate::control::{Control, ControlAck};
use crate::{
codec::Codec, config::Config, consts, dispatcher::Dispatcher, frame, message::Message,
Expand Down Expand Up @@ -137,7 +137,8 @@ where
.map_err(|_| ServerError::HandshakeTimeout)??;

// create h2 codec
let (codec, con) = create_connection(&io, &inner.config);
let codec = Codec::default();
let con = Connection::new(io.get_ref(), codec.clone(), inner.config.clone(), true);

// start protocol dispatcher
IoDispatcher::new(
Expand Down Expand Up @@ -194,32 +195,6 @@ where
}
}

fn create_connection(io: &IoBoxed, config: &Config) -> (Codec, Connection) {
// create h2 codec
let codec = Codec::default();
let con = Connection::new(io.get_ref(), codec.clone(), config.clone(), true);

// slow request timeout
let timeout = config.0.client_timeout.get();
if !timeout.is_zero() {
con.set_flags(ConnectionFlags::SLOW_REQUEST_TIMEOUT);

let state = con.clone();
let _ = ntex_rt::spawn(async move {
sleep(timeout).await;

if state
.flags()
.contains(ConnectionFlags::SLOW_REQUEST_TIMEOUT)
{
state.close()
}
});
}

(codec, con)
}

async fn read_preface(io: &IoBoxed) -> Result<(), ServerError<()>> {
loop {
let ready = io.with_read_buf(|buf| {
Expand Down Expand Up @@ -268,7 +243,8 @@ where
.map_err(|_| ServerError::HandshakeTimeout)??;

// create h2 codec
let (codec, con) = create_connection(&io, &config);
let codec = Codec::default();
let con = Connection::new(io.get_ref(), codec.clone(), config.clone(), true);

// start protocol dispatcher
IoDispatcher::new(
Expand Down

0 comments on commit dd175b1

Please sign in to comment.