diff --git a/crates/zflow_runtime/Cargo.toml b/crates/zflow_runtime/Cargo.toml index 053ff7b..c84f741 100644 --- a/crates/zflow_runtime/Cargo.toml +++ b/crates/zflow_runtime/Cargo.toml @@ -135,7 +135,7 @@ beady = "0.6.0" [build-dependencies] -deno_runtime = {version = "0.150.0"} +deno_runtime = {version = "0.150.0", optional=true} diff --git a/crates/zflow_runtime/build.rs b/crates/zflow_runtime/build.rs index 26cf88c..90d67cc 100644 --- a/crates/zflow_runtime/build.rs +++ b/crates/zflow_runtime/build.rs @@ -3,5 +3,6 @@ use std::{env, path::PathBuf}; fn main(){ let o = PathBuf::from(env::var_os("OUT_DIR").unwrap()); let runtime_snapshot_path = o.join("DENO_RUNTIME_SNAPSHOT.bin"); + #[cfg(feature = "deno_embed")] deno_runtime::snapshot::create_runtime_snapshot(runtime_snapshot_path, deno_runtime::ops::bootstrap::SnapshotOptions::default()); } \ No newline at end of file diff --git a/crates/zflow_runtime/network.rs b/crates/zflow_runtime/network.rs index 06f6185..0801949 100644 --- a/crates/zflow_runtime/network.rs +++ b/crates/zflow_runtime/network.rs @@ -973,7 +973,6 @@ impl BaseNetwork for Network { if self.get_processes().contains_key(&node.id) { return Ok(self.get_processes().get(&node.id.clone()).unwrap().clone()); } - // } let mut process = NetworkProcess { id: node.id.clone(), ..NetworkProcess::default() @@ -1714,6 +1713,8 @@ impl BaseNetwork for Network { } } + + fn connect_port( _socket: Arc>, process: NetworkProcess, diff --git a/crates/zflow_runtime/websocket/mod.rs b/crates/zflow_runtime/websocket/mod.rs index 8d3558e..d53a12f 100644 --- a/crates/zflow_runtime/websocket/mod.rs +++ b/crates/zflow_runtime/websocket/mod.rs @@ -1,6 +1,7 @@ use std::{ + borrow::BorrowMut, collections::HashMap, - sync::{Arc, Mutex, RwLock}, + sync::{mpsc, Arc, Mutex, RwLock}, time::Duration, }; @@ -128,7 +129,7 @@ impl Provider for RemoteProvider { let manifest_cloned = manifest.clone(); self.worker - .call_ack("manifest", json!({}), move |data, _| match &data { + .call_ack("manifest", JsonValue::Null, move |data, _| match &data { Payload::Binary(b) => { let data = String::from_utf8_lossy(b).to_string(); if let Ok(man) = serde_json::de::from_str::(&data) { @@ -168,6 +169,7 @@ impl RemoteProvider { let worker = SocketWorker { client: None, publisher: Arc::new(Mutex::new(Publisher::new())), + status: SocketWorkerStatus::STOPPED, }; Self { @@ -248,7 +250,8 @@ impl RemoteProvider { let output = this.output.clone(); let _process_name = process_name.clone(); - let err: RwLock = RwLock::new(ProcessError::default()); + let (err_sender, err_reciever) = mpsc::channel::>(); + if let Ok(_pub) = publisher.try_lock().as_mut() { _pub.subscribe_fn(move |subscription| { let event = &subscription.0; @@ -261,33 +264,27 @@ impl RemoteProvider { let send_buffer = format!("/{}/send_buffer", &id); if event.as_str() == send.as_str() { - output - .send(&subscription.1.clone()) - .map_err(|er| { - let mut _err = err.write().unwrap(); - *_err = er; - }) - .unwrap(); + if let Err(err) = output.send(&subscription.1.clone()) { + err_sender.send(Some(err)).unwrap(); + } else { + err_sender.send(None).unwrap(); + } } if event.as_str() == send_done.as_str() { - output - .send_done(&subscription.1.clone()) - .map_err(|er| { - let mut _err = err.write().unwrap(); - *_err = er; - }) - .unwrap(); + if let Err(err) = output.send_done(&subscription.1.clone()) { + err_sender.send(Some(err)).unwrap(); + } else { + err_sender.send(None).unwrap(); + } } if event.as_str() == send_buffer.as_str() { if let Ok(out) = OutputBuffer::deserialize(subscription.1.clone()) { - output - .send_buffer(out.port, out.packet) - .map_err(|er| { - let mut _err = err.write().unwrap(); - *_err = er; - }) - .unwrap(); + if let Err(err) = output.send_buffer(out.port, out.packet) { + err_sender.send(Some(err)).unwrap(); + return + } } + err_sender.send(None).unwrap(); } }); } @@ -295,12 +292,25 @@ impl RemoteProvider { rpc_client .emit(process_name.clone(), mapped_inputs) .map_err(|e| ProcessError(e.to_string()))?; + + if let Ok(error) = err_reciever.recv() { + if let Some(err) = error { + return Err(err) + } + } + Ok(ProcessResult::default()) }); } } +pub enum SocketWorkerStatus { + RUNNING, + STOPPED, +} + pub struct SocketWorker { + status: SocketWorkerStatus, client: Option, pub publisher: Arc>>, } @@ -325,11 +335,21 @@ impl SocketWorker { }) .connect()?, ); + self.status = SocketWorkerStatus::RUNNING; Ok(()) } - pub(crate) fn stop(&self) -> Result<(), anyhow::Error> { - self.client.as_ref().unwrap().disconnect()?; + pub(crate) fn stop(&mut self) -> Result<(), anyhow::Error> { + let client = self.client.clone(); + let (sender, receiver) = mpsc::channel::(); + + self.call_ack("shutdown", JsonValue::Null, move |_, __| { + client.as_ref().unwrap().disconnect().unwrap(); + sender.send(SocketWorkerStatus::STOPPED).unwrap(); + })?; + if let Ok(status) = receiver.recv() { + self.status = status; + } Ok(()) }