Skip to content

Commit

Permalink
feat(snot): working timeline execution
Browse files Browse the repository at this point in the history
Signed-off-by: Zander Franks <[email protected]>
  • Loading branch information
voximity committed Mar 28, 2024
1 parent e72c2d3 commit 53e1588
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 48 deletions.
8 changes: 5 additions & 3 deletions crates/snot/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use futures_util::future::join_all;
use indexmap::{map::Entry, IndexMap};
use serde::Deserialize;
use snot_common::state::{AgentId, AgentPeer, AgentState, NodeKey};
use tokio::{sync::Mutex, task::JoinHandle};
use tracing::{info, warn};

use self::timeline::reconcile_agents;
use self::timeline::{reconcile_agents, ExecutionError};
use crate::{
cannon::{sink::TxSink, source::TxSource, CannonInstance},
schema::{
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct Environment {
pub cannons: HashMap<usize, CannonInstance>,

pub timeline: Vec<TimelineEvent>,
pub timeline_handle: Mutex<Option<JoinHandle<Result<(), ExecutionError>>>>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -222,6 +224,7 @@ impl Environment {
})
},
timeline,
timeline_handle: Default::default(),
};

let env_id = ENVS_COUNTER.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -407,7 +410,6 @@ pub async fn initial_reconcile(env_id: usize, state: &GlobalState) -> anyhow::Re
}
}

reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await;

reconcile_agents(pending_reconciliations.into_iter(), &state.pool).await?;
Ok(())
}
122 changes: 84 additions & 38 deletions crates/snot/src/env/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use anyhow::bail;
use futures_util::future::join_all;
use snot_common::state::AgentState;
use thiserror::Error;
use tokio::{select, sync::RwLock, task::JoinHandle};
use tokio::{select, sync::RwLock, task::JoinError};
use tracing::info;

use super::Environment;
use crate::{
Expand All @@ -19,13 +20,26 @@ use crate::{
pub enum ExecutionError {
#[error("an agent is offline, so the test cannot complete")]
AgentOffline,
#[error("reconcilation failed: {0}")]
Reconcile(#[from] BatchReconcileError),
#[error("join error: {0}")]
Join(#[from] JoinError),
}

/// The tuple to pass into `reconcile_agents`.
pub type PendingAgentReconcile = (AgentId, AgentClient, AgentState);

#[derive(Debug, Error)]
#[error("batch reconciliation failed with {failures} failed reconciliations")]
pub struct BatchReconcileError {
pub failures: usize,
}

/// Reconcile a bunch of agents at once.
pub async fn reconcile_agents<I>(iter: I, pool_mtx: &RwLock<HashMap<AgentId, Agent>>)
pub async fn reconcile_agents<I>(
iter: I,
pool_mtx: &RwLock<HashMap<AgentId, Agent>>,
) -> Result<(), BatchReconcileError>
where
I: Iterator<Item = PendingAgentReconcile>,
{
Expand Down Expand Up @@ -73,6 +87,14 @@ where
"reconciliation result: {success}/{} nodes reconciled",
num_reconciliations
);

if success == num_reconciliations {
Ok(())
} else {
Err(BatchReconcileError {
failures: num_reconciliations - success,
})
}
}

impl Environment {
Expand All @@ -82,23 +104,44 @@ impl Environment {
None => bail!("no env with id {id}"),
});

// TODO: put this handle somewhere so we can terminate timeline execution
let _handle: JoinHandle<Result<(), ExecutionError>> = tokio::spawn(async move {
let handle_lock_env = Arc::clone(&env);
let mut handle_lock = handle_lock_env.timeline_handle.lock().await;

// abort if timeline is already being executed
match &*handle_lock {
Some(handle) if !handle.is_finished() => {
bail!("environment timeline is already being executed")
}
_ => (),
}

*handle_lock = Some(tokio::spawn(async move {
for event in env.timeline.iter() {
let pool = state.pool.read().await;

// task handles that must be awaited for this timeline event
let mut awaiting_handles = vec![];

// add a duration sleep if a duration was specified
if let Some(duration) = &event.duration {
match duration {
EventDuration::Time(duration) => {
awaiting_handles.push(tokio::spawn(tokio::time::sleep(*duration)));
&EventDuration::Time(duration) => {
awaiting_handles.push(tokio::spawn(async move {
tokio::time::sleep(duration).await;
Ok(())
}));
}

// TODO
_ => unimplemented!(),
}
}

// whether or not to reconcile asynchronously (if any of the reconcile actions
// are awaited)
let mut reconcile_async = false;

// the pending reconciliations
let mut pending_reconciliations: HashMap<usize, PendingAgentReconcile> =
HashMap::new();

Expand Down Expand Up @@ -128,71 +171,74 @@ impl Environment {
}

for ActionInstance { action, awaited } in &event.actions.0 {
let handle = match action {
match action {
// toggle online state
Action::Online(targets) | Action::Offline(targets) => {
if *awaited {
reconcile_async = true;
}

let online = matches!(action, Action::Online(_));

for agent in env.matching_agents(targets, &*pool) {
set_node_field!(agent, online = online);
}

// get target agents
// let agents = env
// .matching_agents(targets, &*pool)
// .map(|agent| {
// agent.map_to_node_state_reconcile(|mut n|
// {
// n.online = online;
// n
// })
// })
// .collect::<Option<Vec<_>>>()
// .ok_or(ExecutionError::AgentOffline)?;

// // reconcile each client agent
// let task_state = Arc::clone(&state);
// tokio::spawn(async move {
// reconcile_agents(agents.into_iter(),
// &task_state.pool).await;
// })
}

Action::Cannon(_) => unimplemented!(),
Action::Height(_) => unimplemented!(),
};

if *awaited {
// awaiting_handles.push(handle);
}
}

drop(pool);

// TODO: error handling
let task_state = Arc::clone(&state);
let reconcile_handle = tokio::spawn(async move {
reconcile_agents(
pending_reconciliations.into_iter().map(|(_, v)| v),
&task_state.pool,
)
.await
});

// await the reconciliation if any of the actions were `.await`
if reconcile_async {
awaiting_handles.push(reconcile_handle);
}

let handles_fut = join_all(awaiting_handles.into_iter());

// wait for the awaiting futures to complete
match &event.timeout {
let handles_result = match &event.timeout {
// apply a timeout to `handles_fut`
Some(timeout) => match timeout {
EventDuration::Time(duration) => select! {
_ = tokio::time::sleep(*duration) => (),
_ = handles_fut => (),
_ = tokio::time::sleep(*duration) => continue,
res = handles_fut => res,
},

_ => unimplemented!(),
},

// no timeout, regularly await the handles
None => {
handles_fut.await;
None => handles_fut.await,
};

for result in handles_result.into_iter() {
match result {
Ok(Ok(())) => (),
Ok(Err(e)) => return Err(ExecutionError::Reconcile(e)),
Err(e) => return Err(ExecutionError::Join(e)),
}
}
}

info!("------------------------------------------");
info!("playback of environment timeline completed");
info!("------------------------------------------");

Ok(())
});
}));

Ok(())
}
Expand Down
16 changes: 9 additions & 7 deletions crates/snot/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ async fn post_env_prepare(state: State<AppState>, body: String) -> Response {
}
}

async fn post_env_timeline(
Path(env_id): Path<usize>,
State(state): State<AppState>,
) -> impl IntoResponse {
// ...

StatusCode::OK
async fn post_env_timeline(Path(env_id): Path<usize>, State(state): State<AppState>) -> Response {
match Environment::execute(state, env_id).await {
Ok(()) => StatusCode::OK.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": format!("{e}") })),
)
.into_response(),
}
}

async fn delete_env_timeline(
Expand Down
28 changes: 28 additions & 0 deletions specs/test-timeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
---
version: storage.snarkos.testing.monadic.us/v1

id: base
name: base-ledger

generate:
path: ./tests/base

---
version: nodes.snarkos.testing.monadic.us/v1

name: 1-validator

nodes:
validator/0:
key: committee.0
height: 0
validators: [validator/*]
peers: []
---
version: timeline.snarkos.testing.monadic.us/v1

timeline:
- duration: 5s
- offline: '*/*'
duration: 5s
- online: '*/*'

0 comments on commit 53e1588

Please sign in to comment.