Skip to content

Commit

Permalink
skip snapshot build if deno feature is not enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
darmie committed May 6, 2024
1 parent 3b68cee commit 586da08
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/zflow_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ beady = "0.6.0"


[build-dependencies]
deno_runtime = {version = "0.150.0"}
deno_runtime = {version = "0.150.0", optional=true}



Expand Down
1 change: 1 addition & 0 deletions crates/zflow_runtime/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ use std::{env, path::PathBuf};
fn main(){
let o = PathBuf::from(env::var_os("OUT_DIR").unwrap());
let runtime_snapshot_path = o.join("DENO_RUNTIME_SNAPSHOT.bin");
#[cfg(feature = "deno_embed")]
deno_runtime::snapshot::create_runtime_snapshot(runtime_snapshot_path, deno_runtime::ops::bootstrap::SnapshotOptions::default());
}
3 changes: 2 additions & 1 deletion crates/zflow_runtime/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,6 @@ impl BaseNetwork for Network {
if self.get_processes().contains_key(&node.id) {
return Ok(self.get_processes().get(&node.id.clone()).unwrap().clone());
}
// }
let mut process = NetworkProcess {
id: node.id.clone(),
..NetworkProcess::default()
Expand Down Expand Up @@ -1714,6 +1713,8 @@ impl BaseNetwork for Network {
}
}



fn connect_port(
_socket: Arc<Mutex<InternalSocket>>,
process: NetworkProcess,
Expand Down
72 changes: 46 additions & 26 deletions crates/zflow_runtime/websocket/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
borrow::BorrowMut,
collections::HashMap,
sync::{Arc, Mutex, RwLock},
sync::{mpsc, Arc, Mutex, RwLock},
time::Duration,
};

Expand Down Expand Up @@ -128,7 +129,7 @@ impl Provider for RemoteProvider {
let manifest_cloned = manifest.clone();

self.worker
.call_ack("manifest", json!({}), move |data, _| match &data {
.call_ack("manifest", JsonValue::Null, move |data, _| match &data {
Payload::Binary(b) => {
let data = String::from_utf8_lossy(b).to_string();
if let Ok(man) = serde_json::de::from_str::<Manifest>(&data) {
Expand Down Expand Up @@ -168,6 +169,7 @@ impl RemoteProvider {
let worker = SocketWorker {
client: None,
publisher: Arc::new(Mutex::new(Publisher::new())),
status: SocketWorkerStatus::STOPPED,
};

Self {
Expand Down Expand Up @@ -248,7 +250,8 @@ impl RemoteProvider {

let output = this.output.clone();
let _process_name = process_name.clone();
let err: RwLock<ProcessError> = RwLock::new(ProcessError::default());
let (err_sender, err_reciever) = mpsc::channel::<Option<ProcessError>>();

if let Ok(_pub) = publisher.try_lock().as_mut() {
_pub.subscribe_fn(move |subscription| {
let event = &subscription.0;
Expand All @@ -261,46 +264,53 @@ impl RemoteProvider {
let send_buffer = format!("/{}/send_buffer", &id);

if event.as_str() == send.as_str() {
output
.send(&subscription.1.clone())
.map_err(|er| {
let mut _err = err.write().unwrap();
*_err = er;
})
.unwrap();
if let Err(err) = output.send(&subscription.1.clone()) {
err_sender.send(Some(err)).unwrap();
} else {
err_sender.send(None).unwrap();
}
}
if event.as_str() == send_done.as_str() {
output
.send_done(&subscription.1.clone())
.map_err(|er| {
let mut _err = err.write().unwrap();
*_err = er;
})
.unwrap();
if let Err(err) = output.send_done(&subscription.1.clone()) {
err_sender.send(Some(err)).unwrap();
} else {
err_sender.send(None).unwrap();
}
}
if event.as_str() == send_buffer.as_str() {
if let Ok(out) = OutputBuffer::deserialize(subscription.1.clone()) {
output
.send_buffer(out.port, out.packet)
.map_err(|er| {
let mut _err = err.write().unwrap();
*_err = er;
})
.unwrap();
if let Err(err) = output.send_buffer(out.port, out.packet) {
err_sender.send(Some(err)).unwrap();
return
}
}
err_sender.send(None).unwrap();
}
});
}

rpc_client
.emit(process_name.clone(), mapped_inputs)
.map_err(|e| ProcessError(e.to_string()))?;

if let Ok(error) = err_reciever.recv() {
if let Some(err) = error {
return Err(err)
}
}

Ok(ProcessResult::default())
});
}
}

pub enum SocketWorkerStatus {
RUNNING,
STOPPED,
}

pub struct SocketWorker {
status: SocketWorkerStatus,
client: Option<rust_socketio::client::Client>,
pub publisher: Arc<Mutex<Publisher<(String, JsonValue)>>>,
}
Expand All @@ -325,11 +335,21 @@ impl SocketWorker {
})
.connect()?,
);
self.status = SocketWorkerStatus::RUNNING;
Ok(())
}

pub(crate) fn stop(&self) -> Result<(), anyhow::Error> {
self.client.as_ref().unwrap().disconnect()?;
pub(crate) fn stop(&mut self) -> Result<(), anyhow::Error> {
let client = self.client.clone();
let (sender, receiver) = mpsc::channel::<SocketWorkerStatus>();

self.call_ack("shutdown", JsonValue::Null, move |_, __| {
client.as_ref().unwrap().disconnect().unwrap();
sender.send(SocketWorkerStatus::STOPPED).unwrap();
})?;
if let Ok(status) = receiver.recv() {
self.status = status;
}
Ok(())
}

Expand Down

0 comments on commit 586da08

Please sign in to comment.