diff --git a/rust/ares/Cargo.lock b/rust/ares/Cargo.lock index 7822f58f..826336d6 100644 --- a/rust/ares/Cargo.lock +++ b/rust/ares/Cargo.lock @@ -73,7 +73,8 @@ dependencies = [ "json", "lazy_static", "libc", - "lmdb-rkv", + "lmdb", + "lmdb-sys", "memmap", "murmur3", "num-derive", @@ -213,12 +214,6 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "cast" version = "0.3.0" @@ -683,22 +678,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] -name = "lmdb-rkv" -version = "0.14.0" +name = "lmdb" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447a296f7aca299cfbb50f4e4f3d49451549af655fb7215d7f8c0c3d64bad42b" +checksum = "5b0908efb5d6496aa977d96f91413da2635a902e5e31dbef0bfb88986c248539" dependencies = [ "bitflags 1.3.2", - "byteorder", "libc", - "lmdb-rkv-sys", + "lmdb-sys", ] [[package]] -name = "lmdb-rkv-sys" -version = "0.11.2" +name = "lmdb-sys" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61b9ce6b3be08acefa3003c57b7565377432a89ec24476bbe72e11d101f852fe" +checksum = "d5b392838cfe8858e86fac37cf97a0e8c55cc60ba0a18365cadc33092f128ce9" dependencies = [ "cc", "libc", @@ -981,18 +975,18 @@ checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" [[package]] name = "serde" -version = "1.0.196" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.196" +version = "1.0.203" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", diff --git a/rust/ares/Cargo.toml b/rust/ares/Cargo.toml index 4797426f..de0aad59 100644 --- a/rust/ares/Cargo.toml +++ b/rust/ares/Cargo.toml @@ -34,7 +34,8 @@ num-derive = "0.3" num-traits = "0.2" signal-hook = "0.3" static_assertions = "1.1.0" -lmdb-rkv = "0.14.0" +lmdb = "0.8.0" +lmdb-sys = "0.8.0" [build-dependencies] autotools = "0.2" diff --git a/rust/ares/src/disk.rs b/rust/ares/src/disk.rs index b9dd2daf..db0a6ed0 100644 --- a/rust/ares/src/disk.rs +++ b/rust/ares/src/disk.rs @@ -1,75 +1,131 @@ /** Disk storage for events. */ -use lmdb::{Environment, Error as LmdbError}; +use crate::jets::list::util::lent; +use crate::lmdb::{lmdb_gulf, lmdb_read_meta}; +use crate::mem::NockStack; +use crate::mug::mug_u32; +use crate::noun::{IndirectAtom, Noun, D, T}; +use crate::serf::Context; +use crate::serialization::cue; +use lmdb::{Cursor, Environment, Error as LmdbError, Transaction}; +use lmdb_sys as ffi; +use std::convert::TryInto; use std::fs; use std::path::PathBuf; use std::result::Result as StdResult; #[derive(Debug, PartialEq)] pub enum Error { - InvalidPath, - EpochNotFound, - Lmdb(LmdbError), + InvalidPath, + EpochNotFound, + Lmdb(LmdbError), } pub type Result = StdResult; -struct Disk { - pub log_dir: PathBuf, - pub epoch: u64, - pub env: Environment, +pub struct Disk { + /// Full path to the pier's log directory. + pub dir: PathBuf, + + /// Current epoch number. + pub epoch: u64, + + /// Current epoch's LMDB environment. + pub env: Environment, + + /// Last event number in the log. + pub done: u64, } impl Disk { - pub fn init(&self, log_dir: PathBuf) -> Result { - let epoch = self.last_epoch(&log_dir)?; - let epoch_dir = log_dir.join(format!("0i{}", epoch)); - let env_builder = Environment::new(); - let env_res = env_builder.open(epoch_dir.as_path()); - match env_res { - Ok(env) => { - Ok( - Disk { - log_dir: log_dir, + pub fn new(log_dir: PathBuf) -> Self { + let epoch = epoch_last(&log_dir).expect("Failed to get last epoch"); + let epoch_dir = log_dir.join(format!("0i{}", epoch)); + let mut env_builder = Environment::new(); + env_builder.set_map_size(0x10000000000); + env_builder.set_max_dbs(2); + let env = env_builder + .open(epoch_dir.as_path()) + .expect("Failed to open LMDB environment"); + let (_, high) = lmdb_gulf(&env); + Disk { + dir: log_dir, epoch: epoch, env: env, - } - ) - } - Err(err) => Err(Error::Lmdb(err)), + done: high, + } } - } +} - /// Get the number of the latest epoch in the given directory, or return - /// an error if there are no epochs or the path specified isn't a directory. - pub fn last_epoch(&self, log_dir: &PathBuf) -> Result { +/// Get the number of the latest epoch in the given directory, or return +/// an error if there are no epochs or the path specified isn't a directory. +pub fn epoch_last(log_dir: &PathBuf) -> Result { if !log_dir.is_dir() { - return Err(Error::InvalidPath); + return Err(Error::InvalidPath); } let mut some = false; let mut last = 0; - if let Ok(entries) = fs::read_dir(log_dir) { - for entry in entries { - if let Ok(entry) = entry { - if let Some(name) = entry.file_name().to_str() { - if let Some(epoch) = name.strip_prefix("0i") { - if let Ok(n) = epoch.parse::() { - some = true; - if n > last { - last = n; + if let Ok(entries) = fs::read_dir(log_dir.clone()) { + for entry in entries { + if let Ok(entry) = entry { + if let Some(name) = entry.file_name().to_str() { + if let Some(epoch) = name.strip_prefix("0i") { + if let Ok(n) = epoch.parse::() { + some = true; + if n > last { + last = n; + } + } + } } - } } - } } - } } if some { - return Ok(last); + return Ok(last); } Err(Error::EpochNotFound) - } -} \ No newline at end of file +} + +/// Open the specified epoch's LMDB environment. +fn _epoch_load(log: Disk, epoch: u64) -> Result<()> { + let epoch_dir = log.dir.join(format!("0i{}", epoch)); + let env_builder = Environment::new(); + let env_res = env_builder.open(epoch_dir.as_path()); + match env_res { + Ok(_) => Ok(()), + Err(err) => Err(Error::Lmdb(err)), + } +} + +/// Read a value from the metadata database. +pub fn disk_read_meta(env: &Environment, key: &str) -> Result { + lmdb_read_meta(env, key).map_err(|e| Error::Lmdb(e)) +} + +/// Read `len` events from the database, starting at `eve`. +pub fn disk_read_list(ctx: &mut Context, eve: u64, len: u64) -> Option { + let stack = &mut ctx.nock_context.stack; + let mut events: Noun = D(0); + let db_name = "EVENTS"; + let log = &ctx.log; + let env = &log.env; + let txn = env.begin_ro_txn().unwrap(); + let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; + let cursor = txn.open_ro_cursor(db).unwrap(); + let mut i = eve; + while i <= eve + len { + let key = u64::to_le_bytes(i); + let value = cursor.get(Some(&key), None, ffi::MDB_SET_KEY).unwrap().1; + println!("event {}: len: {}", i, value.len()); + let _mug = u32::from_le_bytes(value[0..4].try_into().unwrap()); + let jam = unsafe { IndirectAtom::new_raw_bytes_ref(stack, &value[5..]) }; + let e = cue(stack, jam.as_atom()); + events = T(stack, &[e, events]); + i += 1; + } + Some(events) +} diff --git a/rust/ares/src/lib.rs b/rust/ares/src/lib.rs index 3d8b6dcb..09c3fcd3 100644 --- a/rust/ares/src/lib.rs +++ b/rust/ares/src/lib.rs @@ -9,6 +9,8 @@ pub mod guard; pub mod hamt; pub mod interpreter; pub mod jets; +pub mod lmdb; +pub mod mars; pub mod mem; pub mod mug; pub mod newt; diff --git a/rust/ares/src/lmdb.rs b/rust/ares/src/lmdb.rs new file mode 100644 index 00000000..1843a3ee --- /dev/null +++ b/rust/ares/src/lmdb.rs @@ -0,0 +1,58 @@ +use lmdb::{Cursor, Environment, Transaction}; +use lmdb_sys as ffi; +use std::convert::TryInto; + +pub type Result = std::result::Result; + +pub fn lmdb_read_meta(env: &Environment, key: &str) -> Result { + let db_name = "META"; + let txn = env.begin_ro_txn()?; + let db = unsafe { txn.open_db(Some(db_name))? }; + let bytes: &[u8] = txn.get(db, &key.as_bytes())?; + if bytes.len() > 8 { + panic!("lmdb_read_meta: value too large for u64"); + } + let mut value: u64 = 0; + for &byte in bytes.iter() { + value = (value << 8) | u64::from(byte); + } + Ok(value) +} + +pub fn lmdb_gulf(env: &Environment) -> (u64, u64) { + let db_name = "EVENTS"; + let txn = env.begin_ro_txn().unwrap(); + let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; + let cursor = txn.open_ro_cursor(db).unwrap(); + if let Some(first) = cursor.get(None, None, ffi::MDB_FIRST).unwrap().0 { + let low = u64::from_le_bytes(first.try_into().unwrap()); + if let Some(last) = cursor.get(None, None, ffi::MDB_LAST).unwrap().0 { + let high = u64::from_le_bytes(last.try_into().unwrap()); + return (low, high); + } else { + panic!("Couldn't get last event from the database"); + } + } else { + panic!("Couldn't get first event from the database"); + } +} + +/// Read `len` events from the database, starting at `eve`, and calling `read_f` for each +/// event. +pub fn lmdb_read(env: &Environment, eve: u64, len: usize, mut read_f: F) +where + F: FnMut(u64, &[u8]), +{ + let db_name = "EVENTS"; + let txn = env.begin_ro_txn().unwrap(); + let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; + let mut cursor = txn.open_ro_cursor(db).unwrap(); + for (key, value) in cursor.iter_from(&u64::to_le_bytes(eve)) { + println!("key: {:?}", key); + if key > u64::to_le_bytes(eve + len as u64).as_ref() { + break; + } + let key = u64::from_le_bytes(key.try_into().unwrap()); + read_f(key, value); + } +} diff --git a/rust/ares/src/main.rs b/rust/ares/src/main.rs index 483f08f1..dd250dd5 100644 --- a/rust/ares/src/main.rs +++ b/rust/ares/src/main.rs @@ -1,7 +1,10 @@ +use ares::disk::Disk; use ares::jets::hot::URBIT_HOT_STATE; -use ares::serf::serf; +use ares::mars::{mars_play, Mars}; +use ares::serf::{Context, serf}; use std::env; use std::io; +use std::path::PathBuf; fn main() -> io::Result<()> { // debug @@ -13,9 +16,9 @@ fn main() -> io::Result<()> { }; } - let filename = env::args().nth(1).expect("Must provide input filename"); + let cmd = env::args().nth(1).expect("Must provide input filename"); - if filename == "see gdb! definition in lib.rs about this" { + if cmd == "see gdb! definition in lib.rs about this" { ares::interpreter::use_gdb(); ares::jets::use_gdb(); ares::jets::bits::use_gdb(); @@ -31,9 +34,42 @@ fn main() -> io::Result<()> { ares::serialization::use_gdb(); } - if filename == "serf" { + if cmd == "serf" { return serf(URBIT_HOT_STATE); + } else if cmd == "play" { + let pier_path = PathBuf::from( + env::args() + .nth(2) + .expect("Must provide path to log directory"), + ); + + let ctx = Context::load(pier_path.clone(), None, URBIT_HOT_STATE); + println!("ctx.event_num: {}", ctx.event_num); + + let sent = ctx.event_num; + let done = sent; + + let mars = Mars { + ctx: ctx, + dir: pier_path, + sent: sent, + done: done, + }; + + let eve = env::args() + .nth(4) + .expect("Must provide event number to play up to") + .parse::() + .expect("Failed to parse event number"); + + let sap = env::args() + .nth(6) + .expect("Must provide snapshot interval") + .parse::() + .expect("Failed to parse snapshot interval"); + + mars_play(mars, eve, sap); } - panic!("Ares can only run as a serf!"); + Ok(()) } diff --git a/rust/ares/src/mars.rs b/rust/ares/src/mars.rs new file mode 100644 index 00000000..33c13144 --- /dev/null +++ b/rust/ares/src/mars.rs @@ -0,0 +1,143 @@ +use std::{cmp::min, path::PathBuf}; +use std::fmt::{Display, Formatter, Result as FmtResult}; +use std::result::Result as StdResult; + +use crate::disk::*; +use crate::jets::list::util::lent; +use crate::noun::Noun; +use crate::persist::pma_sync; +use crate::serf::{Context, play_life}; + +#[derive(Debug)] +pub enum Error { + PlayOOM, + PlayInterrupted, + PlayEventLogFailure, + PlayMugMismatch, + PlayFailure, +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + match self { + Error::PlayOOM => write!(f, "play: out of memory"), + Error::PlayInterrupted => write!(f, "play: interrupted"), + Error::PlayEventLogFailure => write!(f, "play: event log failure"), + Error::PlayMugMismatch => write!(f, "play: mug mismatch"), + Error::PlayFailure => write!(f, "play: failure"), + } + } + +} + +pub type Result = StdResult; + +pub struct Mars { + /// Serf context. + pub ctx: Context, + + /// Execution directory (pier). + pub dir: PathBuf, + + /// Last event requested. + pub sent: u64, + + /// Last event processed. + pub done: u64, + +} + +/// Do a boot. +fn mars_boot(mars: &mut Mars, eve: u64) -> Result<()> { + let log = &mut mars.ctx.log; + let ctx = &mut mars.ctx; + let seq = disk_read_list(ctx, 1, eve).unwrap(); // boot sequence + println!("--------------- bootstrap starting ----------------"); + println!("boot: 1-{}", lent(seq).unwrap()); + play_life(ctx, seq); + println!("--------------- bootstrap complete ----------------"); + Ok(()) +} + +/// Replay up to `eve`, snapshot every `sap` events. +pub fn mars_play(mut mars: Mars, mut eve: u64, sap: u64) -> u64 { + println!("mars_play: mars.sent={} mars.done={} eve={}, sap={}", mars.sent, mars.done, eve, sap); + + let mut played = 0u64; + + if eve == 0 { + eve = mars.ctx.log.done; + } else if eve <= mars.ctx.log.done { + println!("mars: already computed {}", eve); + println!(" state={}, &mut mars.log={}", mars.done, mars.ctx.log.done); + return played; + } else { + eve = min(eve, mars.ctx.log.done); + } + + if mars.done == mars.ctx.log.done { + return played; + } + + if mars.done < eve { + played = eve - mars.done; + } + + if mars.done == 0 { + let life = disk_read_meta(&mars.ctx.log.env, "life").unwrap(); + + mars_boot(&mut mars, life).unwrap(); + + mars.sent = life; + mars.done = life; + + pma_sync(); + } + + println!("---------------- playback starting ----------------"); + + if (eve + 1) == mars.ctx.log.done { + println!("play: event {}", mars.ctx.log.done); + } else if eve != mars.ctx.log.done { + println!("play: events {}-{} of {}", (mars.done + 1), eve, mars.ctx.log.done); + } else { + println!("play: events {}-{}", (mars.done + 1), eve); + } + + { + let mut past = mars.done; // last snapshot + // let meme = 0; // last event to bail:meme + // let shot = 0; // meme retry count + + while mars.done < eve { + match mars_play_batch(&mut mars, true, 1024) { + Ok(_) => { + if sap > 0 && (mars.done - past) >= sap { + pma_sync(); + past = mars.done; + println!("play ({}): save", mars.done); + } else { + println!("play: ({}): done", mars.done); + } + break; + } + Err(e) => { // XX implement retries and other error handling + println!("play: error {}", e); + break; + } + } + } + } + + println!("---------------- playback complete ----------------"); + + pma_sync(); + + played +} + +/// Play a batch of events. XX add a `when` parameter. +fn mars_play_batch(mars: &mut Mars, mug: bool, batch: u32) -> Result<()> { + let log: &mut Disk = &mut mars.ctx.log; + Ok(()) +} diff --git a/rust/ares/src/serf.rs b/rust/ares/src/serf.rs index be84067b..cb45dd3b 100644 --- a/rust/ares/src/serf.rs +++ b/rust/ares/src/serf.rs @@ -1,3 +1,4 @@ +use crate::disk::Disk; use crate::hamt::Hamt; use crate::interpreter::{inc, interpret, Error, Mote}; use crate::jets::cold::Cold; @@ -71,7 +72,7 @@ impl Persist for Snapshot { #[repr(C)] #[repr(packed)] -struct SnapshotMem { +pub struct SnapshotMem { pub epoch: u64, pub event_num: u64, pub arvo: Noun, @@ -80,20 +81,23 @@ struct SnapshotMem { const PMA_CURRENT_SNAPSHOT_VERSION: u64 = 1; -struct Context { - epoch: u64, - event_num: u64, - arvo: Noun, - mug: u32, - nock_context: interpreter::Context, +pub struct Context { + pub epoch: u64, + pub event_num: u64, + pub arvo: Noun, + pub mug: u32, + pub nock_context: interpreter::Context, + pub log: Disk, } impl Context { pub fn load( - snap_path: PathBuf, + pier_path: PathBuf, trace_info: Option, constant_hot_state: &[HotEntry], ) -> Context { + let mut snap_path = pier_path.clone(); + snap_path.push(".urb/chk"); pma_open(snap_path).expect("serf: pma open failed"); let snapshot_version = pma_meta_get(BTMetaField::SnapshotVersion as usize); @@ -106,7 +110,7 @@ impl Context { _ => panic!("Unsupported snapshot version"), }; - Context::new(trace_info, snapshot, constant_hot_state) + Context::new(pier_path, trace_info, snapshot, constant_hot_state) } pub unsafe fn save(&mut self) { @@ -139,6 +143,7 @@ impl Context { } fn new( + pier_path: PathBuf, trace_info: Option, snapshot: Option, constant_hot_state: &[HotEntry], @@ -174,12 +179,17 @@ impl Context { trace_info, }; + let mut log_path = pier_path.clone(); + log_path.push(".urb/log"); + let log = Disk::new(log_path); + Context { epoch, event_num, arvo, mug, nock_context, + log: log, } } @@ -441,7 +451,7 @@ fn goof(context: &mut Context, mote: Mote, traces: Noun) -> Noun { /** Run slam; process stack trace to tang if error. * Generate tracing events, if JSON tracing enabled. */ -fn soft(context: &mut Context, ovo: Noun, trace_name: Option) -> Result { +pub fn soft(context: &mut Context, ovo: Noun, trace_name: Option) -> Result { let slam_res = if context.nock_context.trace_info.is_some() { let start = Instant::now(); let slam_res = slam(context, POKE_AXIS, ovo); @@ -469,7 +479,7 @@ fn soft(context: &mut Context, ovo: Noun, trace_name: Option) -> Result< } } -fn play_life(context: &mut Context, eve: Noun) { +pub fn play_life(context: &mut Context, eve: Noun) { let stack = &mut context.nock_context.stack; let sub = T(stack, &[D(0), D(3)]); let lyf = T(stack, &[D(2), sub, D(0), D(2)]);