Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reconcile 2.0, event streams, version enforcement #315

Merged
merged 68 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
79cb859
chore: update snarkos/snarkvm to latest canary
Meshiest Nov 15, 2024
cc1f188
feat(agent): WIP offline node resurrection
Meshiest Nov 15, 2024
b006beb
feat(agent): WIP reconcile 2.0
Meshiest Nov 16, 2024
5db510f
feat(agent): WIP reconcile 2.0 loop, reconciler context and command e…
Meshiest Nov 16, 2024
0f85e91
refactor(agent): cleanup agent reconciler mutability
Meshiest Nov 16, 2024
22eab78
feat(agent): WIP file and binary reconcilers
Meshiest Nov 17, 2024
f2f6138
feat(agent): storage version reconciler, preparing for ledger and gen…
Meshiest Nov 17, 2024
b87750c
feat(agent): Process context and reorganized storage reconcilers
Meshiest Nov 19, 2024
b787da5
feat(agent): genesis reconciler, WIP ledger reconciler
Meshiest Nov 19, 2024
84a0e7a
feat(agent): WIP continued ledger reconciler
Meshiest Nov 23, 2024
4d6173b
feat(agent): ledger reconciler modification task
Meshiest Nov 23, 2024
9f87fe8
feat(agent): WIP process shutdown reconciler
Meshiest Nov 24, 2024
c42add3
feat(agent): graceful shutdown outside of reconciler
Meshiest Nov 24, 2024
a38ab53
refactor(agent): remove dead code
Meshiest Nov 25, 2024
c9f70e6
feat(agent): save reconcile persistence, replace shutdown hook
Meshiest Nov 25, 2024
376b603
chore(aot): update based on canary revert
Meshiest Nov 25, 2024
4ec39e8
feat(db): add a generic DbRecords to support arbitrary dataformat values
Meshiest Nov 25, 2024
7ec0f1f
fix(agent): fix various reconcile errors
Meshiest Nov 25, 2024
96ddc5d
feat(agent,control): reconcile can now update node configuration
Meshiest Nov 26, 2024
67e7739
refactor(controlplane): remove prometheus caching, parallelize httpsd…
Meshiest Nov 26, 2024
79939c0
chore(controlplane): fix prom types
Meshiest Nov 26, 2024
7ee48ad
refactor(agent,controlplane): add address updating, convert old recon…
Meshiest Nov 27, 2024
d51b6ad
chore(agent,controlplane): error, logging, and mild code cleanup
Meshiest Nov 28, 2024
0b5a4ea
feat(snops): emit reconcile status to control plane, move reconcile s…
Meshiest Nov 28, 2024
21cca78
fix(aot): fix proposal cache preventing address flexibility for valid…
Meshiest Nov 28, 2024
266930f
feat(controlplane): event stream, subscribing, and filtering. WIP act…
Meshiest Nov 28, 2024
606b907
fix(agent): fix not broadcasting reconcile status, fix some missing r…
Meshiest Nov 28, 2024
49d7b57
feat(controlplane): event stream tests and quality of life
Meshiest Nov 28, 2024
7a26c11
fix(controlplane): online/offline/reboot actions properly wait for re…
Meshiest Nov 28, 2024
83eced6
perf(snops): address requests use vec instead of hashset for agent id…
Meshiest Nov 29, 2024
da53e34
chore: remove complete todos
Meshiest Nov 29, 2024
c7410d9
refactor(controlplane): reduce repeated handshake code, fixing missin…
Meshiest Nov 29, 2024
63d22cc
feat(snops): enforce agent versions when connecting to controlplane, …
Meshiest Nov 29, 2024
ab60727
chore(controlplane): remove unused ledger file code
Meshiest Nov 30, 2024
af94071
feat(agent): restart node if binary version changed
Meshiest Nov 30, 2024
7e898f6
fix(aot): fix proposal cache not being deleted
Meshiest Dec 1, 2024
745ba8f
feat(snops): support "any" instead of wildcard for nodetargets
Meshiest Dec 1, 2024
aedee1e
feat(controlplane): reboot action does not modify agent online status
Meshiest Dec 1, 2024
6ad74a7
fix(snops): ensure agent wipes non-persisted ledgers
Meshiest Dec 1, 2024
641fa78
fix(controlplane): allow controlplane to be higher patch version than…
Meshiest Dec 1, 2024
12e7aab
fix(agent): use private key in env instead of cli args
Meshiest Dec 1, 2024
a74ce99
fix(agent): fix compute agent not downloading binary, fix compute age…
Meshiest Dec 1, 2024
8ddc6ae
refactor(events): namespace agent events
Meshiest Dec 1, 2024
eb98876
refactor(cannon): use Arc<String> for transaction ids
Meshiest Dec 1, 2024
9cf6e58
feat(cannon): WIP migrate cannon events to the new event streams
Meshiest Dec 1, 2024
32af512
refactor(controlplane): replace old transaction status tracking with …
Meshiest Dec 1, 2024
8974e79
fix(aot,controlplane): use correct cost calculation for isonets, use …
Meshiest Dec 2, 2024
373f289
feat(controlplane): events string parsing
Meshiest Dec 2, 2024
e3d55d9
fix(events): node-target-is filter now supports vec node targets
Meshiest Dec 2, 2024
8c860dd
refactor(events): move event filters and models to common
Meshiest Dec 2, 2024
a800c5a
feat(controlplane): events websocket API
Meshiest Dec 3, 2024
b7ba176
feat(cli): event listening
Meshiest Dec 3, 2024
fee3dca
feat(events): has-* event filters
Meshiest Dec 3, 2024
9aab506
fix(cli): fix filters not being url encoded
Meshiest Dec 3, 2024
a195ce0
fix(events): fix auth and transactions serialization collision on unt…
Meshiest Dec 3, 2024
e9aca23
refactor(rpc): convert rpc codec from bincode to json, cleanup some e…
Meshiest Dec 3, 2024
3e543a2
feat(agent): cleanup reconcile serialization
Meshiest Dec 3, 2024
e35f9dc
fix(snops): fix reconcile complete not sending, add version to connec…
Meshiest Dec 4, 2024
b88a9c7
feat(cli): actions and prepare now wait for reconcile to complete unl…
Meshiest Dec 4, 2024
58ba2a8
feat(cli): monitor event stream for transactions
Meshiest Dec 4, 2024
88c780c
feat(controlplane): rename prepare to apply and clean to delete
Meshiest Dec 5, 2024
c765afc
refactor(snops): replace DocHeightRequest with HeightRequest because …
Meshiest Dec 5, 2024
0f4cb0b
fix(cli): fix env errors not printing
Meshiest Dec 5, 2024
55f3ee3
feat(agent): ensure node actually starts up for reconcile to succeed
Meshiest Dec 5, 2024
ecab658
docs: update docs from latest changes
Meshiest Dec 6, 2024
d4e254a
deps: remove unused deps
Meshiest Dec 6, 2024
87f1050
refactor(cli): fix clipages not building by using async reqwest
Meshiest Dec 6, 2024
cb59208
docs(cli): update cli docs
Meshiest Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 96 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ reqwest = { version = "0.12", default-features = false, features = [
"default-tls",
"http2",
] }
rmp-serde = "1.3.0"
# Can't update this cause snarkos/vm
rocksdb = { version = "0.21", default-features = false }
rustls = { version = "0.23.15", features = ["ring"] }
semver = { version = "1.0", features = ["serde"] }
serde = { version = "1", default-features = false, features = [
"alloc",
"derive",
Expand Down Expand Up @@ -129,9 +132,9 @@ snops-common = { path = "./crates/common" }
# snarkos-node-metrics = { version = "3.0" }
# snarkvm = { version = "1.0", features = ["rocks"] }

snarkos-account = { git = "https://github.com/AleoNet/snarkOS", rev = "c6de459" }
snarkos-node = { git = "https://github.com/AleoNet/snarkOS", rev = "c6de459" }
snarkos-node-metrics = { git = "https://github.com/AleoNet/snarkOS", rev = "c6de459" }
snarkvm = { git = "https://github.com/AleoNet/snarkVM", rev = "4eb83d7", default-features = false, features = [
snarkos-account = { git = "https://github.com/AleoNet/snarkOS", rev = "ba41197" }
snarkos-node = { git = "https://github.com/AleoNet/snarkOS", rev = "ba41197" }
snarkos-node-metrics = { git = "https://github.com/AleoNet/snarkOS", rev = "ba41197" }
snarkvm = { git = "https://github.com/AleoNet/snarkVM", rev = "1de86e7", default-features = false, features = [
"rocks",
] }
8 changes: 5 additions & 3 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "snops-agent"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
license = "MIT"
description = "A snarkops agent for communicating with snarkos nodes and the control plane"
Expand All @@ -14,17 +14,20 @@ mangen = ["snops-common/mangen"]
[dependencies]
anyhow.workspace = true
axum = { workspace = true, features = ["http2", "json", "tokio", "ws"] }
bincode.workspace = true
bytes.workspace = true
chrono.workspace = true
clap.workspace = true
dashmap.workspace = true
futures.workspace = true
futures-util.workspace = true
http.workspace = true
httpdate.workspace = true
indexmap.workspace = true
lazysort.workspace = true
local-ip-address.workspace = true
nix = { workspace = true, features = ["signal"] }
reqwest = { workspace = true, features = ["json", "stream"] }
rustls.workspace = true
serde_json.workspace = true
sha2.workspace = true
simple_moving_average.workspace = true
Expand All @@ -43,4 +46,3 @@ tracing-appender.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
url.workspace = true
rustls = { version = "0.23.15", features = ["ring"] }
149 changes: 91 additions & 58 deletions crates/agent/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use reqwest::IntoUrl;
use sha2::{Digest, Sha256};
use snops_common::{
binaries::{BinaryEntry, BinarySource},
state::TransferStatusUpdate,
rpc::error::ReconcileError,
state::{TransferId, TransferStatusUpdate},
util::sha256_file,
};
use tokio::{fs::File, io::AsyncWriteExt};
Expand All @@ -24,6 +25,7 @@ const TRANSFER_UPDATE_RATE: Duration = Duration::from_secs(2);

/// Download a file. Returns a None if 404.
pub async fn download_file(
tx_id: TransferId,
client: &reqwest::Client,
url: impl IntoUrl,
to: impl AsRef<Path>,
Expand All @@ -35,8 +37,7 @@ pub async fn download_file(
return Ok(None);
}

// create a new transfer
let tx_id = transfers::next_id();
// start a new transfer
transfer_tx.send((
tx_id,
TransferStatusUpdate::Start {
Expand Down Expand Up @@ -98,26 +99,6 @@ pub async fn download_file(
Ok(Some((file, sha256, downloaded)))
}

pub async fn check_file(
url: impl IntoUrl,
to: &Path,
transfer_tx: TransferTx,
) -> anyhow::Result<()> {
let client = reqwest::Client::new();

if !should_download_file(&client, url.as_str(), to, None)
.await
.unwrap_or(true)
{
return Ok(());
}

info!("downloading {to:?}");
download_file(&client, url, to, transfer_tx).await?;

Ok(())
}

pub async fn check_binary(
binary: &BinaryEntry,
base_url: &str,
Expand All @@ -136,23 +117,30 @@ pub async fn check_binary(

// this also checks for sha256 differences, along with last modified time
// against the target
if !should_download_file(&client, &source_url, path, Some(binary))
.await
.unwrap_or(true)
{
let file_issues = get_file_issues(
&client,
&source_url,
path,
binary.size,
binary.sha256.as_deref(),
false,
)
.await;

if file_issues.is_ok_and(|issues| issues.is_none()) {
// check permissions and ensure 0o755
let perms = path.metadata()?.permissions();
if perms.mode() != 0o755 {
tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).await?;
}

// TODO: check sha256 and size

return Ok(());
}
info!("downloading binary update to {}: {binary}", path.display());

let Some((file, sha256, size)) = download_file(&client, &source_url, path, transfer_tx).await?
let tx_id = transfers::next_id();
let Some((file, sha256, size)) =
download_file(tx_id, &client, &source_url, path, transfer_tx).await?
else {
bail!("downloading binary returned 404");
};
Expand Down Expand Up @@ -186,47 +174,92 @@ pub async fn check_binary(
Ok(())
}

pub async fn should_download_file(
#[derive(Debug)]
pub enum BadFileReason {
/// File is missing
NotFound,
/// File size mismatch
Size,
/// SHA256 mismatch
Sha256,
/// A new version is available based on modified header
Stale,
}

pub async fn get_file_issues(
client: &reqwest::Client,
loc: &str,
path: &Path,
binary: Option<&BinaryEntry>,
) -> anyhow::Result<bool> {
if !path.exists() {
return Ok(true);
src: &str,
dst: &Path,
size: Option<u64>,
sha256: Option<&str>,
offline: bool,
) -> Result<Option<BadFileReason>, ReconcileError> {
if !dst.try_exists().unwrap_or(false) {
return Ok(Some(BadFileReason::NotFound));
}

let meta = tokio::fs::metadata(&path).await?;
let meta = tokio::fs::metadata(&dst)
.await
.map_err(|e| ReconcileError::FileStatError(dst.to_path_buf(), e.to_string()))?;
let local_content_length = meta.len();

// if the binary entry is provided, check if the file size and sha256 match
if let Some(binary) = binary {
// file size is incorrect
if binary.size.is_some_and(|s| s != local_content_length) {
return Ok(true);
}
// file size is incorrect
if size.is_some_and(|s| s != local_content_length) {
return Ok(Some(BadFileReason::Size));
}

// if sha256 is present, only download if the sha256 is different
if let Some(sha256) = binary.sha256.as_ref() {
return Ok(sha256_file(&path.to_path_buf())? != sha256.to_ascii_lowercase());
}
// if sha256 is present, only download if the sha256 is different
if let Some(sha256) = sha256 {
let bad_sha256 = sha256_file(&dst.to_path_buf())
.map_err(|e| ReconcileError::FileReadError(dst.to_path_buf(), e.to_string()))?
!= sha256.to_ascii_lowercase();
return Ok(bad_sha256.then_some(BadFileReason::Sha256));
}

// if we're offline, don't download
if offline {
return Ok(None);
}

// check last modified
let res = client.head(loc).send().await?;
let res = client
.head(src)
.send()
.await
.map_err(|e| ReconcileError::HttpError {
method: String::from("HEAD"),
url: src.to_owned(),
error: e.to_string(),
})?;

let Some(last_modified_header) = res.headers().get(http::header::LAST_MODIFIED) else {
return Ok(true);
let Some(last_modified_header) = res
.headers()
.get(http::header::LAST_MODIFIED)
// parse as a string
.and_then(|e| e.to_str().ok())
else {
return Ok(Some(BadFileReason::Stale));
};

let Some(content_length_header) = res.headers().get(http::header::CONTENT_LENGTH) else {
return Ok(true);
let Some(remote_content_length) = res
.headers()
.get(http::header::CONTENT_LENGTH)
// parse the header as a u64
.and_then(|e| e.to_str().ok().and_then(|s| s.parse::<u64>().ok()))
else {
return Ok(Some(BadFileReason::Size));
};

let remote_last_modified = httpdate::parse_http_date(last_modified_header.to_str()?)?;
let local_last_modified = meta.modified()?;

let remote_content_length = content_length_header.to_str()?.parse::<u64>()?;

Ok(remote_last_modified > local_last_modified || remote_content_length != local_content_length)
let remote_last_modified = httpdate::parse_http_date(last_modified_header);
let local_last_modified = meta
.modified()
.map_err(|e| ReconcileError::FileStatError(dst.to_path_buf(), e.to_string()))?;

let is_stale = remote_last_modified
.map(|res| res > local_last_modified)
.unwrap_or(true);
Ok(is_stale
.then_some(BadFileReason::Stale)
.or_else(|| (remote_content_length != local_content_length).then_some(BadFileReason::Size)))
}
38 changes: 35 additions & 3 deletions crates/agent/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use std::{
use clap::CommandFactory;
use clap::Parser;
use http::Uri;
use snops_common::state::{AgentId, AgentModeOptions, PortConfig};
use snops_common::state::{AgentId, AgentModeOptions, NetworkId, PortConfig, StorageId};
use tracing::{info, warn};

use crate::net;

pub const ENV_ENDPOINT: &str = "SNOPS_ENDPOINT";
pub const ENV_ENDPOINT_DEFAULT: &str = "127.0.0.1:1234";

Expand Down Expand Up @@ -119,6 +121,9 @@ impl Cli {

let mut query = format!("/agent?mode={}", u8::from(self.modes));

// Add agent version
query.push_str(&format!("&version={}", env!("CARGO_PKG_VERSION")));

// add &id=
query.push_str(&format!("&id={}", self.id));

Expand All @@ -127,13 +132,13 @@ impl Cli {
if fs::metadata(file).is_ok() {
query.push_str("&local_pk=true");
} else {
warn!("private-key-file flag ignored as the file was not found: {file:?}")
warn!("Private-key-file flag ignored as the file was not found: {file:?}")
}
}

// add &labels= if id is present
if let Some(labels) = &self.labels {
info!("using labels: {:?}", labels);
info!("Using labels: {:?}", labels);
query.push_str(&format!(
"&labels={}",
labels
Expand Down Expand Up @@ -167,4 +172,31 @@ impl Cli {
ws_uri,
)
}

pub fn addrs(&self) -> (Vec<IpAddr>, Option<IpAddr>) {
let internal_addrs = match (self.internal, self.external) {
// use specified internal address
(Some(internal), _) => vec![internal],
// use no internal address if the external address is loopback
(None, Some(external)) if external.is_loopback() => vec![],
// otherwise, get the local network interfaces available to this node
(None, _) => net::get_internal_addrs().expect("failed to get network interfaces"),
};

let external_addr = self.external;
if let Some(addr) = external_addr {
info!("Using external addr: {}", addr);
} else {
info!("Skipping external addr");
}

(internal_addrs, external_addr)
}

pub fn storage_path(&self, network: NetworkId, storage_id: StorageId) -> PathBuf {
let mut path = self.path.join("storage");
path.push(network.to_string());
path.push(storage_id.to_string());
path
}
}
Loading
Loading