Skip to content

Commit

Permalink
add breakslow module
Browse files Browse the repository at this point in the history
  • Loading branch information
bachue committed Apr 19, 2021
1 parent 1e5a32c commit fee9b7f
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 9 deletions.
128 changes: 128 additions & 0 deletions src/breakslow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::{
io::{Error as IOError, ErrorKind as IOErrorKind, Read, Result as IOResult},
time::{Duration, Instant},
};

pub(super) struct BreakSlow<R: Read> {
source: R,
low_speed_duration: Duration,
low_speed_bytes_per_second: usize,
read_bytes_in_this_second: usize,
this_seconds_starts_at: Option<Instant>,
slow_duration: Duration,
}

impl<R: Read> BreakSlow<R> {
pub(super) fn new(
source: R,
low_speed_duration: Duration,
low_speed_bytes_per_second: usize,
) -> Self {
Self {
source,
low_speed_bytes_per_second,
low_speed_duration,
read_bytes_in_this_second: Default::default(),
this_seconds_starts_at: Default::default(),
slow_duration: Default::default(),
}
}
}

impl<R: Read> Read for BreakSlow<R> {
fn read(&mut self, buf: &mut [u8]) -> IOResult<usize> {
let have_read = self.source.read(buf)?;
if self.low_speed_bytes_per_second > 0 {
if let Some(this_seconds_starts_at) = self.this_seconds_starts_at {
self.read_bytes_in_this_second += have_read;
if this_seconds_starts_at.elapsed() >= Duration::from_secs(1) {
if self.read_bytes_in_this_second < self.low_speed_bytes_per_second {
self.slow_duration += Duration::from_secs(1);
if self.slow_duration >= self.low_speed_duration {
return Err(IOError::new(IOErrorKind::TimedOut, "Slow Response"));
}
} else {
self.slow_duration = Default::default();
}
self.this_seconds_starts_at = Some(Instant::now());
self.read_bytes_in_this_second = 0;
}
} else {
self.this_seconds_starts_at = Some(Instant::now());
}
}
Ok(have_read)
}
}

#[cfg(test)]
mod tests {
use super::*;
use rand::{thread_rng, RngCore};
use std::{
error::Error,
io::{copy as io_copy, sink},
thread::sleep,
};

struct SlowCopyIO<R: Read> {
source: R,
copied: usize,
sleep_when_copied: usize,
sleep_duration: Duration,
}

impl<R: Read> SlowCopyIO<R> {
fn new(source: R, sleep_when_copied: usize, sleep_duration: Duration) -> Self {
Self {
source,
sleep_duration,
sleep_when_copied,
copied: Default::default(),
}
}
}

impl<R: Read> Read for SlowCopyIO<R> {
fn read(&mut self, buf: &mut [u8]) -> IOResult<usize> {
let have_read = self.source.read(buf)?;
self.copied += have_read;
if self.copied > self.sleep_when_copied {
self.copied = 0;
sleep(self.sleep_duration);
}
Ok(have_read)
}
}

#[test]
fn test_break_slow_io() -> Result<(), Box<dyn Error>> {
let rand_source = Box::new(thread_rng()) as Box<dyn RngCore>;
let mut r = BreakSlow::new(
SlowCopyIO::new(
rand_source.take(5 * (1 << 20)),
100_000,
Duration::from_millis(200),
),
Duration::from_secs(5),
1 << 20,
);
let err = io_copy(&mut r, &mut sink()).unwrap_err();
assert_eq!(err.kind(), IOErrorKind::TimedOut);
assert_eq!(&err.to_string(), "Slow Response");

let rand_source = Box::new(thread_rng()) as Box<dyn RngCore>;
let mut r = BreakSlow::new(
SlowCopyIO::new(
rand_source.take(5 * (1 << 20)),
100_000,
Duration::from_millis(50),
),
Duration::from_secs(5),
1 << 20,
);
io_copy(&mut r, &mut sink())?;

Ok(())
}
}
26 changes: 17 additions & 9 deletions src/download.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{
base::credential::Credential,
breakslow::BreakSlow,
config::{build_range_reader_builder_from_config, build_range_reader_builder_from_env, Config},
host_selector::HostSelector,
query::HostsQuerier,
Expand Down Expand Up @@ -344,7 +345,7 @@ impl ReadAt for RangeReader {
}
let content_length = parse_content_length(&resp);
let max_size = content_length.min(size);
io_copy(&mut resp.take(max_size), &mut cursor)
io_copy(&mut Self::break_slow(resp.take(max_size)), &mut cursor)
.map_err(|err| IOError::new(IOErrorKind::BrokenPipe, err))
});

Expand Down Expand Up @@ -391,19 +392,20 @@ impl RangeReader {
.send()
.tap_err(|err| self.increase_timeout_power_if_needed(chosen_host, err))
.map_err(|err| IOError::new(IOErrorKind::ConnectionAborted, err))
.and_then(|mut resp| {
.and_then(|resp| {
let mut parts = Vec::with_capacity(ranges.len());
match resp.status() {
StatusCode::OK => {
let body = resp
.bytes()
let mut body = Vec::new();
Self::break_slow(resp)
.read_to_end(&mut body)
.map_err(|err| IOError::new(IOErrorKind::BrokenPipe, err))?;
for &(from, len) in ranges.iter() {
let from = (from as usize).min(body.len());
let len = (len as usize).min(body.len() - from);
if len > 0 {
parts.push(RangePart {
data: body.slice(from..(from + len)).to_vec(),
data: body[from..(from + len)].to_vec(),
range: (from as u64, len as u64),
});
}
Expand Down Expand Up @@ -431,7 +433,8 @@ impl RangeReader {
.to_owned()
};

let mut multipart = Multipart::with_body(&mut resp, boundary);
let mut body = Self::break_slow(resp);
let mut multipart = Multipart::with_body(&mut body, boundary);
loop {
match multipart.read_entry() {
Ok(Some(mut field)) => {
Expand Down Expand Up @@ -494,7 +497,7 @@ impl RangeReader {
})?;
let len = to - from + 1;
let mut data = Vec::with_capacity(len as usize);
resp.copy_to(&mut data).unwrap();
Self::break_slow(resp).read_to_end(&mut data).unwrap();
parts.push(RangePart {
data,
range: (from, len),
Expand Down Expand Up @@ -636,15 +639,15 @@ impl RangeReader {
.send()
.tap_err(|err| self.increase_timeout_power_if_needed(chosen_host, err))
.map_err(|err| IOError::new(IOErrorKind::ConnectionAborted, err))
.and_then(|mut resp| {
.and_then(|resp| {
if resp.status() == StatusCode::RANGE_NOT_SATISFIABLE {
Ok(0)
} else if resp.status() != StatusCode::OK
&& resp.status() != StatusCode::PARTIAL_CONTENT
{
Err(unexpected_status_code(&resp))
} else {
resp.copy_to(writer)
io_copy(&mut Self::break_slow(resp), writer)
.map_err(|err| IOError::new(IOErrorKind::BrokenPipe, err))
}
});
Expand Down Expand Up @@ -719,6 +722,11 @@ impl RangeReader {
)
}

#[inline]
fn break_slow<R: Read>(source: R) -> BreakSlow<R> {
BreakSlow::new(source, Duration::from_secs(5), 1 << 20)
}

fn with_retries<T>(
&self,
method: Method,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! 负责下载完整或部分七牛对象
mod base;
mod breakslow;
mod config;
mod download;
mod host_selector;
Expand Down

0 comments on commit fee9b7f

Please sign in to comment.