From 4b755f8a1f1af2a5f523ed2289b41ead7ea9bd49 Mon Sep 17 00:00:00 2001 From: hasezoey Date: Sat, 4 May 2024 17:51:02 +0200 Subject: [PATCH 1/2] feat(listener): poll a port multiple times until either "max_poll" or returned "None" fixes #71 --- CHANGELOG.md | 6 ++++ src/listener/port.rs | 15 +++++++++ src/listener/worker.rs | 75 +++++++++++++++++++++++++++--------------- 3 files changed, 69 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c21c61e..406bcbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,12 @@ --- +## unreleased + +Unreleased + +- Add `Port::set_max_poll` to set the amount a `Port` is polled in a single `Port::should_poll` (Default: `1` time) + ## 1.9.2 Released on 04/03/2023 diff --git a/src/listener/port.rs b/src/listener/port.rs index ec22844..5d7addc 100644 --- a/src/listener/port.rs +++ b/src/listener/port.rs @@ -7,6 +7,9 @@ use std::time::{Duration, Instant}; use super::{Event, ListenerResult, Poll}; +/// The Default number of how often a [`Port`] gets polled in a single [`Port::should_poll`] +pub const DEFAULT_MAX_POLL: usize = 1; + /// A port is a wrapper around the poll trait object, which also defines an interval, which defines /// the amount of time between each poll() call. /// Its purpose is to listen for incoming events of a user-defined type @@ -17,6 +20,7 @@ where poll: Box>, interval: Duration, next_poll: Instant, + max_poll: usize, } impl Port @@ -29,9 +33,20 @@ where poll, interval, next_poll: Instant::now(), + max_poll: DEFAULT_MAX_POLL, } } + /// Set how often a port should get polled in a single poll + pub fn set_max_poll(&mut self, max_poll: usize) { + self.max_poll = max_poll; + } + + /// Get how often a port should get polled in a single poll + pub fn max_poll(&self) -> usize { + self.max_poll + } + /// Returns the interval for the current `Port` pub fn interval(&self) -> &Duration { &self.interval diff --git a/src/listener/worker.rs b/src/listener/worker.rs index 165f76c..4297d5b 100644 --- a/src/listener/worker.rs +++ b/src/listener/worker.rs @@ -118,35 +118,32 @@ where /// Returns only the messages, while the None returned by poll are discarded #[allow(clippy::needless_collect)] fn poll(&mut self) -> Result<(), mpsc::SendError>> { - let msg: Vec> = self - .ports - .iter_mut() - .filter_map(|x| { - if x.should_poll() { - let msg = match x.poll() { - Ok(Some(ev)) => Some(ListenerMsg::User(ev)), - Ok(None) => None, - Err(err) => Some(ListenerMsg::Error(err)), - }; - // Update next poll - x.calc_next_poll(); - msg - } else { - None + let port_iter = self.ports.iter_mut().filter(|port| port.should_poll()); + + for port in port_iter { + let mut times_remaining = port.max_poll(); + // poll a port until it has nothing anymore + loop { + let msg = match port.poll() { + Ok(Some(ev)) => ListenerMsg::User(ev), + Ok(None) => break, + Err(err) => ListenerMsg::Error(err), + }; + + self.sender.send(msg)?; + + // do this at the end to at least call it once + times_remaining = times_remaining.saturating_sub(1); + + if times_remaining == 0 { + break; } - }) - .collect(); - // Send messages - match msg - .into_iter() - .map(|x| self.sender.send(x)) - .filter(|x| x.is_err()) - .map(|x| x.err().unwrap()) - .next() - { - None => Ok(()), - Some(e) => Err(e), + } + // Update next poll + port.calc_next_poll(); } + + Ok(()) } /// thread run method @@ -186,6 +183,30 @@ mod test { use crate::mock::{MockEvent, MockPoll}; use crate::Event; + #[test] + fn worker_should_poll_multiple_times() { + let (tx, rx) = mpsc::channel(); + let paused = Arc::new(RwLock::new(false)); + let paused_t = Arc::clone(&paused); + let running = Arc::new(RwLock::new(true)); + let running_t = Arc::clone(&running); + + let mut mock_port = Port::new(Box::new(MockPoll::default()), Duration::from_secs(5)); + mock_port.set_max_poll(10); + + let mut worker = + EventListenerWorker::::new(vec![mock_port], tx, paused_t, running_t, None); + assert!(worker.poll().is_ok()); + assert!(worker.next_event() <= Duration::from_secs(5)); + let mut recieved = Vec::new(); + + while let Ok(msg) = rx.try_recv() { + recieved.push(msg); + } + + assert_eq!(recieved.len(), 10); + } + #[test] fn worker_should_send_poll() { let (tx, rx) = mpsc::channel(); From 09d754a5abfe3bb44a6f2a25cfbf34dc805cf455 Mon Sep 17 00:00:00 2001 From: hasezoey Date: Sun, 5 May 2024 12:24:28 +0200 Subject: [PATCH 2/2] feat(EventListenerCfg): add function "port_1" to add a Port directly to apply custom options to a port --- CHANGELOG.md | 1 + src/listener/builder.rs | 22 ++++++++++++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 406bcbf..8197cf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ Unreleased - Add `Port::set_max_poll` to set the amount a `Port` is polled in a single `Port::should_poll` (Default: `1` time) +- Add `EventListenerCfg::port_1` to add a manually constructed `Port` ## 1.9.2 diff --git a/src/listener/builder.rs b/src/listener/builder.rs index e175421..00eb697 100644 --- a/src/listener/builder.rs +++ b/src/listener/builder.rs @@ -60,8 +60,15 @@ where } /// Add a new Port (Poll, Interval) to the the event listener - pub fn port(mut self, poll: Box>, interval: Duration) -> Self { - self.ports.push(Port::new(poll, interval)); + pub fn port(self, poll: Box>, interval: Duration) -> Self { + self.port_1(Port::new(poll, interval)) + } + + /// Add a new Port to the the event listener + /// + /// The [`Port`] needs to be manually constructed, unlike [`Self::port`] + pub fn port_1(mut self, port: Port) -> Self { + self.ports.push(port); self } @@ -104,4 +111,15 @@ mod test { .poll_timeout(Duration::from_secs(0)) .start(); } + + #[test] + fn should_add_port_via_port_1() { + let builder = EventListenerCfg::::default(); + assert!(builder.ports.is_empty()); + let builder = builder.port_1(Port::new( + Box::new(MockPoll::default()), + Duration::from_millis(1), + )); + assert_eq!(builder.ports.len(), 1); + } }