Skip to content

Commit

Permalink
Merge pull request #91 from monadicus/feat-storage-persist
Browse files Browse the repository at this point in the history
feat(env): node height persist feature
  • Loading branch information
Meshiest authored Apr 2, 2024
2 parents 6c0d3d5 + e9385f1 commit d68a4e9
Show file tree
Hide file tree
Showing 15 changed files with 387 additions and 147 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/snot-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures-util.workspace = true
http.workspace = true
httpdate = "1.0.3"
local-ip-address = "0.6.1"
nix = { workspace = true, features = ["signal"] }
reqwest = { workspace = true, features = ["stream", "json"] }
snot-common = { path = "../snot-common" }
tarpc.workspace = true
Expand Down
92 changes: 89 additions & 3 deletions crates/snot-agent/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::path::Path;
use std::{os::unix::fs::PermissionsExt, path::Path};

use futures::StreamExt;
use http::StatusCode;
use reqwest::IntoUrl;
use snot_common::api::StorageInfoResponse;
use tokio::{fs::File, io::AsyncWriteExt};
use tracing::info;

/// Download a file. Returns a None if 404.
pub async fn download_file(url: impl IntoUrl, to: impl AsRef<Path>) -> anyhow::Result<Option<()>> {
let req = reqwest::get(url).await?;
pub async fn download_file(
client: &reqwest::Client,
url: impl IntoUrl,
to: impl AsRef<Path>,
) -> anyhow::Result<Option<()>> {
let req = client.get(url).send().await?;
if req.status() == StatusCode::NOT_FOUND {
return Ok(None);
}
Expand All @@ -21,3 +27,83 @@ pub async fn download_file(url: impl IntoUrl, to: impl AsRef<Path>) -> anyhow::R

Ok(Some(()))
}

pub async fn check_file(url: impl IntoUrl, to: &Path) -> anyhow::Result<()> {
let client = reqwest::Client::new();

if !should_download_file(&client, url.as_str(), to)
.await
.unwrap_or(true)
{
return Ok(());
}

info!("downloading {to:?}");
download_file(&client, url, to).await?;

Ok(())
}

pub async fn get_storage_info(url: impl IntoUrl) -> anyhow::Result<StorageInfoResponse> {
let req = reqwest::get(url).await?;
if !req.status().is_success() {
return Err(anyhow::anyhow!(
"error getting storage info: {}",
req.status()
));
}
let body = req.json().await?;
Ok(body)
}

pub async fn check_binary(base_url: &str, path: &Path) -> anyhow::Result<()> {
let client = reqwest::Client::new();

// check if we already have an up-to-date binary
let loc = format!("{base_url}/content/snarkos");
if !should_download_file(&client, &loc, path)
.await
.unwrap_or(true)
{
return Ok(());
}
info!("binary update is available, downloading...");

// download the binary
let mut file = tokio::fs::File::create(path).await?;
let mut stream = client.get(&loc).send().await?.bytes_stream();

while let Some(chunk) = stream.next().await {
file.write_all(&chunk?).await?;
}

// ensure the permissions are set
tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).await?;

Ok(())
}

pub async fn should_download_file(
client: &reqwest::Client,
loc: &str,
path: &Path,
) -> anyhow::Result<bool> {
Ok(match tokio::fs::metadata(&path).await {
Ok(meta) => {
// check last modified
let res = client.head(loc).send().await?;

let Some(last_modified_header) = res.headers().get(http::header::LAST_MODIFIED) else {
return Ok(true);
};

let remote_last_modified = httpdate::parse_http_date(last_modified_header.to_str()?)?;
let local_last_modified = meta.modified()?;

remote_last_modified > local_last_modified
}

// no existing file, unconditionally download binary
Err(_) => true,
})
}
71 changes: 8 additions & 63 deletions crates/snot-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ mod rpc;
mod state;

use std::{
os::unix::fs::PermissionsExt,
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -20,7 +18,6 @@ use http::HeaderValue;
use snot_common::rpc::{agent::AgentService, control::ControlServiceClient, RpcTransport};
use tarpc::server::Channel;
use tokio::{
io::AsyncWriteExt,
select,
signal::unix::{signal, Signal, SignalKind},
};
Expand Down Expand Up @@ -101,7 +98,7 @@ async fn main() {
.ok();

// download the snarkOS binary
check_binary(&format!("http://{endpoint}"), &args.path.join(SNARKOS_FILE)) // TODO: http(s)?
api::check_binary(&format!("http://{endpoint}"), &args.path.join(SNARKOS_FILE)) // TODO: http(s)?
.await
.expect("failed to acquire snarkOS binary");

Expand All @@ -121,6 +118,7 @@ async fn main() {
cli: args,
endpoint,
jwt: Mutex::new(jwt),
env_to_storage: Default::default(),
agent_state: Default::default(),
reconcilation_handle: Default::default(),
child: Default::default(),
Expand Down Expand Up @@ -212,9 +210,12 @@ async fn main() {
// handle incoming messages
msg = ws_stream.next() => match msg {
Some(Ok(tungstenite::Message::Binary(bin))) => {
let Ok(msg) = bincode::deserialize(&bin) else {
warn!("failed to deserialize a message from the control plane");
continue;
let msg = match bincode::deserialize(&bin) {
Ok(msg) => msg,
Err(e) => {
error!("failed to deserialize a message from the control plane: {e}");
continue;
}
};

match msg {
Expand Down Expand Up @@ -275,59 +276,3 @@ impl Signals {
futs.next().await;
}
}

async fn check_binary(base_url: &str, path: &Path) -> anyhow::Result<()> {
let client = reqwest::Client::new();

// check if we already have an up-to-date binary
let loc = format!("{base_url}/content/snarkos");
if !should_download_binary(&client, &loc, path)
.await
.unwrap_or(true)
{
return Ok(());
}

// download the binary
let mut file = tokio::fs::File::create(path).await?;
let mut stream = client.get(&loc).send().await?.bytes_stream();

while let Some(chunk) = stream.next().await {
file.write_all(&chunk?).await?;
}

// ensure the permissions are set
tokio::fs::set_permissions(path, std::fs::Permissions::from_mode(0o755)).await?;

Ok(())
}

async fn should_download_binary(
client: &reqwest::Client,
loc: &str,
path: &Path,
) -> anyhow::Result<bool> {
Ok(match tokio::fs::metadata(&path).await {
Ok(meta) => {
// check last modified
let res = client.head(loc).send().await?;

let Some(last_modified_header) = res.headers().get(http::header::LAST_MODIFIED) else {
return Ok(true);
};

let remote_last_modified = httpdate::parse_http_date(last_modified_header.to_str()?)?;
let local_last_modified = meta.modified()?;

if remote_last_modified > local_last_modified {
info!("binary update is available, downloading...");
true
} else {
false
}
}

// no existing file, unconditionally download binary
Err(_) => true,
})
}
Loading

0 comments on commit d68a4e9

Please sign in to comment.