-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
4f64bc7
commit bf44fb7
Showing
8 changed files
with
378 additions
and
78 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,131 @@ | ||
/** Disk storage for events. */ | ||
use lmdb::{Environment, Error as LmdbError}; | ||
use crate::jets::list::util::lent; | ||
use crate::lmdb::{lmdb_gulf, lmdb_read_meta}; | ||
use crate::mem::NockStack; | ||
use crate::mug::mug_u32; | ||
use crate::noun::{IndirectAtom, Noun, D, T}; | ||
use crate::serf::Context; | ||
use crate::serialization::cue; | ||
use lmdb::{Cursor, Environment, Error as LmdbError, Transaction}; | ||
use lmdb_sys as ffi; | ||
use std::convert::TryInto; | ||
use std::fs; | ||
use std::path::PathBuf; | ||
use std::result::Result as StdResult; | ||
|
||
#[derive(Debug, PartialEq)] | ||
pub enum Error { | ||
InvalidPath, | ||
EpochNotFound, | ||
Lmdb(LmdbError), | ||
InvalidPath, | ||
EpochNotFound, | ||
Lmdb(LmdbError), | ||
} | ||
|
||
pub type Result<T> = StdResult<T, Error>; | ||
|
||
struct Disk { | ||
pub log_dir: PathBuf, | ||
pub epoch: u64, | ||
pub env: Environment, | ||
pub struct Disk { | ||
/// Full path to the pier's log directory. | ||
pub dir: PathBuf, | ||
|
||
/// Current epoch number. | ||
pub epoch: u64, | ||
|
||
/// Current epoch's LMDB environment. | ||
pub env: Environment, | ||
|
||
/// Last event number in the log. | ||
pub done: u64, | ||
} | ||
|
||
impl Disk { | ||
pub fn init(&self, log_dir: PathBuf) -> Result<Disk> { | ||
let epoch = self.last_epoch(&log_dir)?; | ||
let epoch_dir = log_dir.join(format!("0i{}", epoch)); | ||
let env_builder = Environment::new(); | ||
let env_res = env_builder.open(epoch_dir.as_path()); | ||
match env_res { | ||
Ok(env) => { | ||
Ok( | ||
Disk { | ||
log_dir: log_dir, | ||
pub fn new(log_dir: PathBuf) -> Self { | ||
let epoch = epoch_last(&log_dir).expect("Failed to get last epoch"); | ||
let epoch_dir = log_dir.join(format!("0i{}", epoch)); | ||
let mut env_builder = Environment::new(); | ||
env_builder.set_map_size(0x10000000000); | ||
env_builder.set_max_dbs(2); | ||
let env = env_builder | ||
.open(epoch_dir.as_path()) | ||
.expect("Failed to open LMDB environment"); | ||
let (_, high) = lmdb_gulf(&env); | ||
Disk { | ||
dir: log_dir, | ||
epoch: epoch, | ||
env: env, | ||
} | ||
) | ||
} | ||
Err(err) => Err(Error::Lmdb(err)), | ||
done: high, | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Get the number of the latest epoch in the given directory, or return | ||
/// an error if there are no epochs or the path specified isn't a directory. | ||
pub fn last_epoch(&self, log_dir: &PathBuf) -> Result<u64> { | ||
/// Get the number of the latest epoch in the given directory, or return | ||
/// an error if there are no epochs or the path specified isn't a directory. | ||
pub fn epoch_last(log_dir: &PathBuf) -> Result<u64> { | ||
if !log_dir.is_dir() { | ||
return Err(Error::InvalidPath); | ||
return Err(Error::InvalidPath); | ||
} | ||
|
||
let mut some = false; | ||
let mut last = 0; | ||
|
||
if let Ok(entries) = fs::read_dir(log_dir) { | ||
for entry in entries { | ||
if let Ok(entry) = entry { | ||
if let Some(name) = entry.file_name().to_str() { | ||
if let Some(epoch) = name.strip_prefix("0i") { | ||
if let Ok(n) = epoch.parse::<u64>() { | ||
some = true; | ||
if n > last { | ||
last = n; | ||
if let Ok(entries) = fs::read_dir(log_dir.clone()) { | ||
for entry in entries { | ||
if let Ok(entry) = entry { | ||
if let Some(name) = entry.file_name().to_str() { | ||
if let Some(epoch) = name.strip_prefix("0i") { | ||
if let Ok(n) = epoch.parse::<u64>() { | ||
some = true; | ||
if n > last { | ||
last = n; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
if some { | ||
return Ok(last); | ||
return Ok(last); | ||
} | ||
|
||
Err(Error::EpochNotFound) | ||
} | ||
} | ||
} | ||
|
||
/// Open the specified epoch's LMDB environment. | ||
fn _epoch_load(log: Disk, epoch: u64) -> Result<()> { | ||
let epoch_dir = log.dir.join(format!("0i{}", epoch)); | ||
let env_builder = Environment::new(); | ||
let env_res = env_builder.open(epoch_dir.as_path()); | ||
match env_res { | ||
Ok(_) => Ok(()), | ||
Err(err) => Err(Error::Lmdb(err)), | ||
} | ||
} | ||
|
||
/// Read a value from the metadata database. | ||
pub fn disk_read_meta(env: &Environment, key: &str) -> Result<u64> { | ||
lmdb_read_meta(env, key).map_err(|e| Error::Lmdb(e)) | ||
} | ||
|
||
/// Read `len` events from the database, starting at `eve`. | ||
pub fn disk_read_list(ctx: &mut Context, eve: u64, len: u64) -> Option<Noun> { | ||
let stack = &mut ctx.nock_context.stack; | ||
let mut events: Noun = D(0); | ||
let db_name = "EVENTS"; | ||
let log = &ctx.log; | ||
let env = &log.env; | ||
let txn = env.begin_ro_txn().unwrap(); | ||
let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; | ||
let cursor = txn.open_ro_cursor(db).unwrap(); | ||
let mut i = eve; | ||
while i <= eve + len { | ||
let key = u64::to_le_bytes(i); | ||
let value = cursor.get(Some(&key), None, ffi::MDB_SET_KEY).unwrap().1; | ||
println!("event {}: len: {}", i, value.len()); | ||
let _mug = u32::from_le_bytes(value[0..4].try_into().unwrap()); | ||
let jam = unsafe { IndirectAtom::new_raw_bytes_ref(stack, &value[5..]) }; | ||
let e = cue(stack, jam.as_atom()); | ||
events = T(stack, &[e, events]); | ||
i += 1; | ||
} | ||
Some(events) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
use lmdb::{Cursor, Environment, Transaction}; | ||
use lmdb_sys as ffi; | ||
use std::convert::TryInto; | ||
|
||
pub type Result<T> = std::result::Result<T, lmdb::Error>; | ||
|
||
pub fn lmdb_read_meta(env: &Environment, key: &str) -> Result<u64> { | ||
let db_name = "META"; | ||
let txn = env.begin_ro_txn()?; | ||
let db = unsafe { txn.open_db(Some(db_name))? }; | ||
let bytes: &[u8] = txn.get(db, &key.as_bytes())?; | ||
if bytes.len() > 8 { | ||
panic!("lmdb_read_meta: value too large for u64"); | ||
} | ||
let mut value: u64 = 0; | ||
for &byte in bytes.iter() { | ||
value = (value << 8) | u64::from(byte); | ||
} | ||
Ok(value) | ||
} | ||
|
||
pub fn lmdb_gulf(env: &Environment) -> (u64, u64) { | ||
let db_name = "EVENTS"; | ||
let txn = env.begin_ro_txn().unwrap(); | ||
let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; | ||
let cursor = txn.open_ro_cursor(db).unwrap(); | ||
if let Some(first) = cursor.get(None, None, ffi::MDB_FIRST).unwrap().0 { | ||
let low = u64::from_le_bytes(first.try_into().unwrap()); | ||
if let Some(last) = cursor.get(None, None, ffi::MDB_LAST).unwrap().0 { | ||
let high = u64::from_le_bytes(last.try_into().unwrap()); | ||
return (low, high); | ||
} else { | ||
panic!("Couldn't get last event from the database"); | ||
} | ||
} else { | ||
panic!("Couldn't get first event from the database"); | ||
} | ||
} | ||
|
||
/// Read `len` events from the database, starting at `eve`, and calling `read_f` for each | ||
/// event. | ||
pub fn lmdb_read<F>(env: &Environment, eve: u64, len: usize, mut read_f: F) | ||
where | ||
F: FnMut(u64, &[u8]), | ||
{ | ||
let db_name = "EVENTS"; | ||
let txn = env.begin_ro_txn().unwrap(); | ||
let db = unsafe { txn.open_db(Some(db_name)).unwrap() }; | ||
let mut cursor = txn.open_ro_cursor(db).unwrap(); | ||
for (key, value) in cursor.iter_from(&u64::to_le_bytes(eve)) { | ||
println!("key: {:?}", key); | ||
if key > u64::to_le_bytes(eve + len as u64).as_ref() { | ||
break; | ||
} | ||
let key = u64::from_le_bytes(key.try_into().unwrap()); | ||
read_f(key, value); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.