Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
manelmontilla committed Nov 16, 2023
1 parent 4e8d5ce commit 23e9a6a
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 19 deletions.
15 changes: 5 additions & 10 deletions wruster/src/streams/cancellable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::BaseStream;
use crate::log::debug;
use polling::Event;
use std::{
io,
io::{self, Write},
net::Shutdown,
sync::{
atomic::{self, AtomicBool, Ordering},
Expand All @@ -29,7 +29,6 @@ where
let read_timeout = RwLock::new(None);
let write_timeout = RwLock::new(None);
let done = atomic::AtomicBool::new(false);
poller.add(stream.as_raw(), Event::all(1))?;
Ok(CancellableStream {
stream,
done,
Expand Down Expand Up @@ -116,14 +115,9 @@ where
let timeout = &self.write_timeout.write().unwrap().clone();
let mut bytes_written = 0;
let buf_len = buf.len();
let raw_stream = self.stream.as_raw();

while bytes_written < buf_len {
events.clear();
// self.poller.modify(raw_stream, Event::writable(1))?;
self.poller
.modify(raw_stream, Event::all(1))?;

self.poller.modify(self.stream.as_raw(), Event::writable(1))?;
if self.poller.wait(&mut events, *timeout)? == 0 {
let stop = self.done.load(atomic::Ordering::SeqCst);
if stop {
Expand All @@ -145,20 +139,21 @@ where
match s.write_buf(write_buf) {
Ok(n) => {
bytes_written += n;
self.stream.flush_data().unwrap();
println!("bytes written: {:?} of {:?}\n", bytes_written, buf_len)
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
println!("would block");
break
}
Err(err) => {
// self.stream.set_nonblocking(false)?;
println!("error writing: {:?}\n", err);
self.stream.set_nonblocking(false)?;
return Err(err);
}
};
break;
}

}
Ok(bytes_written)
}
Expand Down
21 changes: 12 additions & 9 deletions wruster/src/streams/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,29 @@ fn cancellable_stream_write_writes_data() {
let handle = thread::spawn(move || {
let (stream, _) = listener.accept().unwrap();
let mut cstream = CancellableStream::new(stream).unwrap();
//cstream.set_write_timeout(Some(Duration::from_secs(30))).unwrap();
let mut data = Vec::new();
server_data.read_to_end(&mut data).unwrap();

cstream.write(&data)
});

let mut client = TcpClient::connect(addr.to_string()).unwrap();
let mut reader = BufReader::new(&mut client);
let mut content = Vec::new();
let mut expected_file = load_test_file("big.png").unwrap();
let mut expected_data = Vec::new();
expected_file.read_to_end(&mut expected_data).unwrap();
let len = test_file_size("big.png").unwrap();
reader
.read_to_end(&mut content)
.expect("expect data to available");
assert_eq!(content, expected_data);
let bytes_sent = handle
.join()
.unwrap()
.expect("expected data to be written correctly");
let len = test_file_size("big.png").unwrap();
assert_eq!(bytes_sent,len.try_into().unwrap());

let mut reader = BufReader::new(&mut client);
let mut content = Vec::new();
reader
.read_until(b' ', &mut content)
.expect("expect data to available");
let content = String::from_utf8(content).expect("expect data to be valid");
assert_eq!(content, "test ".to_string());
}

pub fn load_test_file(name: &str) -> Result<File, io::Error> {
Expand Down

0 comments on commit 23e9a6a

Please sign in to comment.