Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose commit latency struct for profiling #31

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/database.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
error::{mdbx_result, Error, Result},
latency::CommitLatency,
table::Table,
transaction::{RO, RW},
Mode, ReadWriteOptions, SyncMode, Transaction, TransactionKind,
Expand Down Expand Up @@ -77,7 +78,7 @@ pub(crate) enum TxnManagerMessage {
},
Commit {
tx: TxnPtr,
sender: SyncSender<Result<bool>>,
sender: SyncSender<Result<(bool, CommitLatency)>>,
},
}

Expand Down Expand Up @@ -284,10 +285,14 @@ where
.unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
let mut latency = CommitLatency::new();
sender
.send(mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, ptr::null_mut())
}))
.send(
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, &mut latency.0)
})
.map(|v| (v, latency)),
)
.unwrap();
}
},
Expand Down Expand Up @@ -494,7 +499,8 @@ impl GeometryInfo {
///
/// Contains database information about the map size, readers, last txn id etc.
#[repr(transparent)]
pub struct Info(ffi::MDBX_envinfo);
pub struct Info(pub(crate) ffi::MDBX_envinfo);
pub type PgOpStat = ffi::MDBX_envinfo__bindgen_ty_3;

impl Info {
pub fn geometry(&self) -> GeometryInfo {
Expand Down Expand Up @@ -530,6 +536,12 @@ impl Info {
pub fn num_readers(&self) -> usize {
self.0.mi_numreaders as usize
}

/// Max reader slots used in the database
#[inline]
pub fn pg_op_stat(&self) -> PgOpStat {
self.0.mi_pgop_stat
}
}

impl<E> fmt::Debug for Database<E>
Expand Down
68 changes: 68 additions & 0 deletions src/latency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::{mem, time::Duration};

/// Latency statistics for committing transactions.
#[derive(Debug)]
pub struct CommitLatency(pub(crate) ffi::MDBX_commit_latency);

impl CommitLatency {
pub fn new() -> Self {
unsafe { Self(mem::zeroed()) }
}

/// Duration of preparation (commit child transactions, update
/// sub-databases records and cursors destroying).
#[inline]
pub const fn preparation(&self) -> Duration {
Self::time_to_duration(self.0.preparation)
}

/// Duration of GC/freeDB handling & updation.
#[inline]
pub const fn gc_wallclock(&self) -> Duration {
Self::time_to_duration(self.0.gc_wallclock)
}

/// Duration of internal audit if enabled.
#[inline]
pub const fn audit(&self) -> Duration {
Self::time_to_duration(self.0.audit)
}

/// Duration of writing dirty/modified data pages to a filesystem,
/// i.e. the summary duration of a `write()` syscalls during commit.
#[inline]
pub const fn write(&self) -> Duration {
Self::time_to_duration(self.0.write)
}

/// Duration of syncing written data to the disk/storage, i.e.
/// the duration of a `fdatasync()` or a `msync()` syscall during commit.
#[inline]
pub const fn sync(&self) -> Duration {
Self::time_to_duration(self.0.sync)
}

/// Duration of transaction ending (releasing resources).
#[inline]
pub const fn ending(&self) -> Duration {
Self::time_to_duration(self.0.ending)
}

/// The total duration of a commit.
#[inline]
pub const fn whole(&self) -> Duration {
Self::time_to_duration(self.0.whole)
}

/// User-mode CPU time spent on GC update.
#[inline]
pub const fn gc_cputime(&self) -> Duration {
Self::time_to_duration(self.0.gc_cputime)
}

/// Latency of commit stages in 1/65_536 of seconds units.
#[inline]
const fn time_to_duration(time: u32) -> Duration {
Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use crate::{
},
error::{Error, Result},
flags::*,
latency::CommitLatency,
table::Table,
transaction::{Transaction, TransactionKind, RO, RW},
};
Expand All @@ -19,6 +20,7 @@ mod cursor;
mod database;
mod error;
mod flags;
mod latency;
mod table;
mod transaction;

Expand Down
49 changes: 42 additions & 7 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use crate::{
database::{Database, DatabaseKind, NoWriteMap, TxnManagerMessage, TxnPtr},
error::{mdbx_result, Result},
flags::{c_enum, TableFlags, WriteFlags},
latency::CommitLatency,
table::Table,
Cursor, Decodable, Error, Stat,
Cursor, Decodable, Error, Info, Stat,
};
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use indexmap::IndexSet;
use libc::{c_uint, c_void};
use parking_lot::Mutex;
use sealed::sealed;
use std::{
fmt,
fmt::Debug,
fmt::{self, Debug},
marker::PhantomData,
mem::size_of,
mem::{self, size_of},
ptr, result, slice,
sync::{mpsc::sync_channel, Arc},
};
Expand Down Expand Up @@ -152,11 +152,16 @@ where
}

/// Commits the transaction and returns table handles permanently open for the lifetime of `Database`.
pub fn commit_and_rebind_open_dbs(mut self) -> Result<(bool, Vec<Table<'db>>)> {
/// Also returns measured latency.
pub fn commit_and_rebind_open_dbs_with_latency(
mut self,
) -> Result<(bool, CommitLatency, Vec<Table<'db>>)> {
let txnlck = self.txn.lock();
let txn = txnlck.0;
let result = if K::ONLY_CLEAN {
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) })
let mut latency = CommitLatency::new();
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, &mut latency.0) })
.map(|v| (v, latency))
} else {
let (sender, rx) = sync_channel(0);
self.db
Expand All @@ -171,9 +176,10 @@ where
rx.recv().unwrap()
};
self.committed = true;
result.map(|v| {
result.map(|(v, latency)| {
(
v,
latency,
self.primed_dbis
.lock()
.iter()
Expand All @@ -183,6 +189,13 @@ where
})
}

/// Commits the transaction and returns table handles permanently open for the lifetime of `Database`.
pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, Vec<Table<'db>>)> {
// Drop `CommitLatency` from return value.
self.commit_and_rebind_open_dbs_with_latency()
.map(|v| (v.0, v.2))
}

/// Opens a handle to an MDBX table.
///
/// If `name` is [None], then the returned handle will be for the default table.
Expand Down Expand Up @@ -220,6 +233,28 @@ where
}
}

/// Retrieves statistics about this transaction.
pub fn txn_stat(&self) -> Result<Stat> {
unsafe {
let mut stat = Stat::new();
mdbx_result(txn_execute(&self.txn, |txn| {
ffi::mdbx_env_stat_ex(self.db.ptr().0, txn, stat.mdb_stat(), size_of::<Stat>())
}))?;
Ok(stat)
}
}

/// Retrieves info about this transaction.
pub fn txn_info(&self) -> Result<Info> {
unsafe {
let mut info = Info(mem::zeroed());
mdbx_result(txn_execute(&self.txn, |txn| {
ffi::mdbx_env_info_ex(self.db.ptr().0, txn, &mut info.0, size_of::<Info>())
}))?;
Ok(info)
}
}

/// Open a new cursor on the given table.
pub fn cursor<'txn>(&'txn self, table: &Table<'txn>) -> Result<Cursor<'txn, K>> {
Cursor::new(self, table)
Expand Down