Skip to content

Commit

Permalink
refactor(agent): cleanup agent reconciler mutability
Browse files Browse the repository at this point in the history
  • Loading branch information
Meshiest committed Nov 16, 2024
1 parent 5db510f commit 0f85e91
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
28 changes: 16 additions & 12 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,15 @@ async fn main() {
let state3 = Arc::clone(&state);
let reconcile_loop = Box::pin(async move {
let mut err_backoff = 0;
let mut reconcile_ctx = Default::default();

// Root reconciler that walks through configuring the agent.
// The context is mutated while reconciling to keep track of things
// like downloads, ledger manipulations, node command, and more.
let mut root = AgentStateReconciler {
agent_state: Arc::clone(state3.agent_state.read().await.deref()),
state: Arc::clone(&state3),
context: Default::default(),
};

// The first reconcile is scheduled for 5 seconds after startup.
// Connecting to the controlplane will likely trigger a reconcile sooner.
Expand All @@ -170,19 +178,15 @@ async fn main() {
// schedule the next reconcile for 5 minutes from now
next_reconcile_at = Instant::now() + Duration::from_secs(5 * 60);

// update the reconciler with the latest agent state
// this prevents the agent state from changing during reconciliation
root.agent_state = state3.agent_state.read().await.deref().clone();

trace!("reconciling agent state...");
match (AgentStateReconciler {
agent_state: Arc::clone(state3.agent_state.read().await.deref()),
state: Arc::clone(&state3),
context: std::mem::take(&mut reconcile_ctx),
})
.reconcile()
.await
{
Ok(mut status) => {
if let Some(context) = status.inner.take() {
match root.reconcile().await {
Ok(status) => {
if status.inner.is_some() {
trace!("reconcile completed");
reconcile_ctx = context;
}
if !status.conditions.is_empty() {
trace!("reconcile conditions: {:?}", status.conditions);
Expand Down
25 changes: 11 additions & 14 deletions crates/agent/src/reconcile/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,24 @@ pub struct AgentStateReconcilerContext {
command: Option<NodeCommand>,
// TODO: store active transfers here for monitoring
// TODO: update api::download_file to receive a transfer id
// TODO: allow transfers to be interrupted. potentially allow them to be resumed by using the
// file range feature.
}

impl Reconcile<AgentStateReconcilerContext, ReconcileError2> for AgentStateReconciler {
async fn reconcile(
self,
) -> Result<ReconcileStatus<AgentStateReconcilerContext>, ReconcileError2> {
impl Reconcile<(), ReconcileError2> for AgentStateReconciler {
async fn reconcile(&mut self) -> Result<ReconcileStatus<()>, ReconcileError2> {
match self.agent_state.as_ref() {
AgentState::Inventory => {
// TODO: cleanup child process
// TODO: cleanup other things

// return a default context because the node, in inventory, has no state
return Ok(ReconcileStatus::default().add_scope("agent_state/inventory"));
}
AgentState::Node(env_id, node) => {
// node is offline, no need to reconcile
if !node.online {
// TODO: tear down the node if it is running
return Ok(
ReconcileStatus::with(self.context).add_scope("agent_state/node/offline")
);
return Ok(ReconcileStatus::default().add_scope("agent_state/node/offline"));
}

// TODO: download binaries
Expand Down Expand Up @@ -227,18 +224,18 @@ impl NodeCommand {
}

impl Reconcile<NodeCommand, ReconcileError2> for NodeCommandReconciler {
async fn reconcile(self) -> Result<ReconcileStatus<NodeCommand>, ReconcileError2> {
async fn reconcile(&mut self) -> Result<ReconcileStatus<NodeCommand>, ReconcileError2> {
let NodeCommandReconciler {
node,
state,
env_id,
} = self;
let info = state.get_env_info(env_id).await?;
let info = state.get_env_info(*env_id).await?;

// Resolve the addresses of the peers and validators
let res = AddressResolveReconciler {
node: Arc::clone(&node),
state: Arc::clone(&state),
node: Arc::clone(node),
state: Arc::clone(state),
}
.reconcile()
.await?;
Expand All @@ -265,7 +262,7 @@ impl Reconcile<NodeCommand, ReconcileError2> for NodeCommandReconciler {
let run = NodeCommand {
command_path: state.cli.path.join(SNARKOS_FILE),
quiet: state.cli.quiet,
env_id,
env_id: *env_id,
node_key: node.node_key.clone(),
loki: state.loki.lock().ok().and_then(|l| l.deref().clone()),
ledger_path,
Expand Down Expand Up @@ -311,7 +308,7 @@ struct AddressResolveReconciler {
}

impl Reconcile<(), ReconcileError2> for AddressResolveReconciler {
async fn reconcile(self) -> Result<ReconcileStatus<()>, ReconcileError2> {
async fn reconcile(&mut self) -> Result<ReconcileStatus<()>, ReconcileError2> {
let AddressResolveReconciler { state, node } = self;

// Find agents that do not have cached addresses
Expand Down
10 changes: 6 additions & 4 deletions crates/agent/src/reconcile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ pub mod agent;
mod checkpoint;
mod files;
pub use files::*;
use snops_common::state::TransferId;

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ReconcileCondition {
/// A file is being downloaded.
PendingDownload(String),
/// A file is being transferred.
PendingTransfer(String, TransferId),
/// A file is being unpacked.
PendingUnpack(String),
/// A process is being spawned / confirmed
/// A process is being spawned / confirmed. Could be starting the node or
/// manipulating the ledger
PendingProcess(String),
}

pub trait Reconcile<T, E> {
async fn reconcile(self) -> Result<ReconcileStatus<T>, E>;
async fn reconcile(&mut self) -> Result<ReconcileStatus<T>, E>;
}

pub struct ReconcileStatus<T> {
Expand Down

0 comments on commit 0f85e91

Please sign in to comment.