diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index f8e362764a..5b9620d79b 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1652,6 +1652,7 @@ impl Replicator { let frame = RpcFrame { data: frame_to_inject.bytes(), timestamp: None, + durable_frame_no: None, }; injector.inject_frame(frame).await?; applied_wal_frame = true; diff --git a/libsql-replication/proto/replication_log.proto b/libsql-replication/proto/replication_log.proto index b358232705..a7500025a2 100644 --- a/libsql-replication/proto/replication_log.proto +++ b/libsql-replication/proto/replication_log.proto @@ -36,6 +36,7 @@ message Frame { // if this frames is a commit frame, then this can be set // to the time when the transaction was commited optional int64 timestamp = 2; + optional uint64 durable_frame_no = 3; } message Frames { diff --git a/libsql-replication/src/generated/wal_log.rs b/libsql-replication/src/generated/wal_log.rs index a34d5e59dd..9f716eabfc 100644 --- a/libsql-replication/src/generated/wal_log.rs +++ b/libsql-replication/src/generated/wal_log.rs @@ -83,6 +83,8 @@ pub struct Frame { /// to the time when the transaction was commited #[prost(int64, optional, tag = "2")] pub timestamp: ::core::option::Option, + #[prost(uint64, optional, tag = "3")] + pub durable_frame_no: ::core::option::Option, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/libsql-replication/src/injector/libsql_injector.rs b/libsql-replication/src/injector/libsql_injector.rs index 7c01522e1d..f730881cf6 100644 --- a/libsql-replication/src/injector/libsql_injector.rs +++ b/libsql-replication/src/injector/libsql_injector.rs @@ -49,4 +49,8 @@ impl super::Injector for LibsqlInjector { .map_err(|e| Error::FatalInjectError(e.into()))?; Ok(None) } + + fn durable_frame_no(&mut self, frame_no: u64) { + self.injector.set_durable(frame_no); + } } diff --git a/libsql-replication/src/injector/mod.rs b/libsql-replication/src/injector/mod.rs index b139f07cc9..9519dc1a20 100644 --- a/libsql-replication/src/injector/mod.rs +++ b/libsql-replication/src/injector/mod.rs @@ -29,4 +29,6 @@ pub trait Injector { /// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame /// are then injected into the wal. fn flush(&mut self) -> impl Future>> + Send; + + fn durable_frame_no(&mut self, frame_no: u64); } diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index f6ce2aa89f..17fec1c553 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -46,6 +46,9 @@ impl Injector for SqliteInjector { let inner = self.inner.clone(); spawn_blocking(move || inner.lock().flush()).await.unwrap() } + + #[inline] + fn durable_frame_no(&mut self, _frame_no: u64) {} } impl SqliteInjector { diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index 92edbd4389..f28bb9baa8 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -328,6 +328,10 @@ where async fn inject_frame(&mut self, frame: RpcFrame) -> Result<(), Error> { self.frames_synced += 1; + if let Some(frame_no) = frame.durable_frame_no { + self.injector.durable_frame_no(frame_no); + } + match self.injector.inject_frame(frame).await? { Some(commit_fno) => { self.client.commit_frame_no(commit_fno).await?; @@ -772,6 +776,7 @@ mod test { .map(|f| RpcFrame { data: f.bytes(), timestamp: None, + durable_frame_no: None, }) .take(2) .map(Ok) @@ -785,6 +790,7 @@ mod test { .map(|f| RpcFrame { data: f.bytes(), timestamp: None, + durable_frame_no: None, }) .map(Ok) .collect::>(); diff --git a/libsql-server/src/bottomless_migrate.rs b/libsql-server/src/bottomless_migrate.rs index 75df6f8e19..2b9067baff 100644 --- a/libsql-server/src/bottomless_migrate.rs +++ b/libsql-server/src/bottomless_migrate.rs @@ -162,13 +162,7 @@ async fn migrate_one( .await .unwrap()?; - let mut tx = shared.begin_read(0).into(); - shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!("should be a write txn")) - .into_lock_owned(); - let mut injector = Injector::new(shared.clone(), guard, 10)?; + let mut injector = Injector::new(shared.clone(), 10)?; let orig_db_path = base_path .join("dbs") .join(config.namespace().as_str()) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index cd415eef33..3263db560b 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -177,6 +177,9 @@ pub struct Server, pub migrate_bottomless: bool, pub enable_deadlock_monitor: bool, + pub should_sync_from_storage: bool, + pub force_load_wals: bool, + pub sync_conccurency: usize, } impl Default for Server { @@ -203,6 +206,9 @@ impl Default for Server { connector: None, migrate_bottomless: false, enable_deadlock_monitor: false, + should_sync_from_storage: false, + force_load_wals: false, + sync_conccurency: 8, } } } @@ -781,10 +787,10 @@ where tokio::select! { _ = shutdown.notified() => { let shutdown = async { + namespace_store.shutdown().await?; task_manager.shutdown().await?; // join_set.shutdown().await; service_shutdown.notify_waiters(); - namespace_store.shutdown().await?; Ok::<_, crate::Error>(()) }; @@ -958,19 +964,30 @@ where Ok(()) }); - // If we have performed the migration, load all shared wals to force flush to storage with - // the new registry - if did_migrate { + // If we performed a migration from bottomless to libsql-wal earlier, then we need to + // forecefully load all the wals, to trigger segment storage with the actual storage. This + // is because migration didn't actually send anything to storage, but just created the + // segments. + if did_migrate || self.should_sync_from_storage || self.force_load_wals { + // eagerly load all namespaces, then call sync_all on the registry + // TODO: do conccurently let dbs_path = base_config.base_path.join("dbs"); let stream = meta_store.namespaces(); tokio::pin!(stream); while let Some(conf) = stream.next().await { let registry = registry.clone(); let namespace = conf.namespace().clone(); - let path = dbs_path.join(namespace.as_str()).join("data"); - tokio::task::spawn_blocking(move || registry.open(&path, &namespace.into())) - .await - .unwrap()?; + let path = dbs_path.join(namespace.as_str()); + tokio::fs::create_dir_all(&path).await?; + tokio::task::spawn_blocking(move || { + registry.open(&path.join("data"), &namespace.into()) + }) + .await + .unwrap()?; + } + + if self.should_sync_from_storage { + registry.sync_all(self.sync_conccurency).await?; } } @@ -1236,31 +1253,31 @@ where base_config: &BaseNamespaceConfig, primary_config: &PrimaryConfig, ) -> anyhow::Result { - let is_previous_migration_successful = self.check_previous_migration_success()?; - let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal)); - let is_bottomless_enabled = self.db_config.bottomless_replication.is_some(); let is_primary = self.rpc_client_config.is_none(); - let should_attempt_migration = self.migrate_bottomless - && is_primary - && is_bottomless_enabled - && !is_previous_migration_successful - && is_libsql_wal; - - if should_attempt_migration { - bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?; - Ok(true) - } else { - // the wals directory is present and so is the _dbs. This means that a crash occured - // before we could remove it. clean it up now. see code in `bottomless_migrate.rs` - let tmp_dbs_path = base_config.base_path.join("_dbs"); - if tmp_dbs_path.try_exists()? { - tracing::info!("removed dangling `_dbs` folder"); - tokio::fs::remove_dir_all(&tmp_dbs_path).await?; - } + if self.migrate_bottomless && is_primary { + let is_previous_migration_successful = self.check_previous_migration_success()?; + let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal)); + let is_bottomless_enabled = self.db_config.bottomless_replication.is_some(); + let should_attempt_migration = + is_bottomless_enabled && !is_previous_migration_successful && is_libsql_wal; + + if should_attempt_migration { + bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?; + return Ok(true); + } else { + // the wals directory is present and so is the _dbs. This means that a crash occured + // before we could remove it. clean it up now. see code in `bottomless_migrate.rs` + let tmp_dbs_path = base_config.base_path.join("_dbs"); + if tmp_dbs_path.try_exists()? { + tracing::info!("removed dangling `_dbs` folder"); + tokio::fs::remove_dir_all(&tmp_dbs_path).await?; + } - tracing::info!("bottomless already migrated, skipping..."); - Ok(false) + tracing::info!("bottomless already migrated, skipping..."); + } } + + Ok(false) } fn check_previous_migration_success(&self) -> anyhow::Result { diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index eb24126d6e..7295bf9d16 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -280,6 +280,27 @@ struct Cli { /// Auth key for the admin API #[clap(long, env = "LIBSQL_ADMIN_AUTH_KEY", requires = "admin_listen_addr")] admin_auth_key: Option, + + /// Whether to perform a sync of all namespaces with remote on startup + #[clap( + long, + env = "LIBSQL_SYNC_FROM_STORAGE", + requires = "enable_bottomless_replication" + )] + sync_from_storage: bool, + /// Whether to force loading all WAL at startup, with libsql-wal + /// By default, WALs are loaded lazily, as the databases are openned. + /// Whether to force loading all wal at startup + #[clap(long)] + force_load_wals: bool, + /// Sync conccurency + #[clap( + long, + env = "LIBSQL_SYNC_CONCCURENCY", + requires = "sync_from_storage", + default_value = "8" + )] + sync_conccurency: usize, } #[derive(clap::Subcommand, Debug)] @@ -681,6 +702,9 @@ async fn build_server(config: &Cli) -> anyhow::Result { connector: Some(https), migrate_bottomless: config.migrate_bottomless, enable_deadlock_monitor: config.enable_deadlock_monitor, + should_sync_from_storage: config.sync_from_storage, + force_load_wals: config.force_load_wals, + sync_conccurency: config.sync_conccurency, }) } diff --git a/libsql-server/src/namespace/configurator/helpers.rs b/libsql-server/src/namespace/configurator/helpers.rs index 538f9215fd..b259c4aadb 100644 --- a/libsql-server/src/namespace/configurator/helpers.rs +++ b/libsql-server/src/namespace/configurator/helpers.rs @@ -417,7 +417,7 @@ pub(crate) async fn run_storage_monitor( .await; } Err(e) => { - tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}"); + tracing::warn!("failed to open connection for storage monitor: {e}, trying again in {duration:?}"); } } diff --git a/libsql-server/src/namespace/configurator/libsql_replica.rs b/libsql-server/src/namespace/configurator/libsql_replica.rs index 50ecc95610..1a67dfd1e1 100644 --- a/libsql-server/src/namespace/configurator/libsql_replica.rs +++ b/libsql-server/src/namespace/configurator/libsql_replica.rs @@ -11,7 +11,6 @@ use libsql_sys::name::NamespaceResolver; use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; use libsql_wal::replication::injector::Injector; -use libsql_wal::transaction::Transaction; use libsql_wal::wal::LibsqlWalManager; use tokio::task::JoinSet; use tonic::transport::Channel; @@ -151,13 +150,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator { .await .unwrap(); - let mut tx = Transaction::Read(shared.begin_read(u64::MAX)); - shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!()) - .into_lock_owned(); - let injector = Injector::new(shared, guard, 10).unwrap(); + let injector = Injector::new(shared, 10).unwrap(); let injector = LibsqlInjector::new(injector); let mut replicator = Replicator::new(client, injector); diff --git a/libsql-server/src/rpc/replication/libsql_replicator.rs b/libsql-server/src/rpc/replication/libsql_replicator.rs index 3f5fdb6dc4..19ece80749 100644 --- a/libsql-server/src/rpc/replication/libsql_replicator.rs +++ b/libsql-server/src/rpc/replication/libsql_replicator.rs @@ -13,6 +13,7 @@ use libsql_replication::rpc::replication::{ use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; use libsql_wal::segment::Frame; +use libsql_wal::shared_wal::SharedWal; use md5::{Digest as _, Md5}; use tokio_stream::Stream; use tonic::Status; @@ -72,12 +73,17 @@ pin_project_lite::pin_project! { #[pin] inner: S, flavor: WalFlavor, + shared: Arc>, } } impl FrameStreamAdapter { - fn new(inner: S, flavor: WalFlavor) -> Self { - Self { inner, flavor } + fn new(inner: S, flavor: WalFlavor, shared: Arc>) -> Self { + Self { + inner, + flavor, + shared, + } } } @@ -93,6 +99,11 @@ where Some(Ok(f)) => { match this.flavor { WalFlavor::Libsql => { + let durable_frame_no = if f.header().is_commit() { + Some(this.shared.durable_frame_no()) + } else { + None + }; // safety: frame implemements zerocopy traits, so it can safely be interpreted as a // byte slize of the same size let bytes: Box<[u8; size_of::()]> = @@ -102,6 +113,7 @@ where Poll::Ready(Some(Ok(RpcFrame { data, timestamp: None, + durable_frame_no, }))) } WalFlavor::Sqlite => { @@ -116,6 +128,7 @@ where Poll::Ready(Some(Ok(RpcFrame { data: frame.bytes(), timestamp: None, + durable_frame_no: None, }))) } } @@ -131,6 +144,7 @@ impl ReplicationLog for LibsqlReplicationService { type LogEntriesStream = BoxStream<'static, Result>; type SnapshotStream = BoxStream<'static, Result>; + #[tracing::instrument(skip_all, fields(namespace))] async fn log_entries( &self, req: tonic::Request, @@ -140,11 +154,13 @@ impl ReplicationLog for LibsqlReplicationService { let shared = self.registry.get_async(&namespace.into()).await.unwrap(); let req = req.into_inner(); // TODO: replicator should only accecpt NonZero - let replicator = - libsql_wal::replication::replicator::Replicator::new(shared, req.next_offset.max(1)); + let replicator = libsql_wal::replication::replicator::Replicator::new( + shared.clone(), + req.next_offset.max(1), + ); let flavor = req.wal_flavor(); - let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor); + let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared); Ok(tonic::Response::new(Box::pin(stream))) } diff --git a/libsql-server/src/rpc/replication/replication_log.rs b/libsql-server/src/rpc/replication/replication_log.rs index 356d3d0f16..3abf06b283 100644 --- a/libsql-server/src/rpc/replication/replication_log.rs +++ b/libsql-server/src/rpc/replication/replication_log.rs @@ -187,6 +187,7 @@ fn map_frame_stream_output( Ok((frame, ts)) => Ok(Frame { data: frame.bytes(), timestamp: ts.map(|ts| ts.timestamp_millis()), + durable_frame_no: None, }), Err(LogReadError::SnapshotRequired) => Err(Status::new( tonic::Code::FailedPrecondition, @@ -431,6 +432,7 @@ mod snapshot_stream { yield Ok(Frame { data: libsql_replication::frame::Frame::from(frame).bytes(), timestamp: None, + durable_frame_no: None, }); } Err(e) => { diff --git a/libsql-wal/src/bins/shell/main.rs b/libsql-wal/src/bins/shell/main.rs index 18522b7305..af8eec962b 100644 --- a/libsql-wal/src/bins/shell/main.rs +++ b/libsql-wal/src/bins/shell/main.rs @@ -143,7 +143,7 @@ where S: Storage, { let namespace = NamespaceName::from_string(namespace.to_owned()); - let durable = storage.durable_frame_no(&namespace, None).await; + let durable = storage.durable_frame_no(&namespace, None).await.unwrap(); println!("namespace: {namespace}"); println!("max durable frame: {durable}"); } diff --git a/libsql-wal/src/checkpointer.rs b/libsql-wal/src/checkpointer.rs index 049ceea6f3..98682481eb 100644 --- a/libsql-wal/src/checkpointer.rs +++ b/libsql-wal/src/checkpointer.rs @@ -52,6 +52,7 @@ where IO: Io, S: Sync + Send + 'static, { + #[tracing::instrument(skip(self))] fn checkpoint( &self, namespace: &NamespaceName, @@ -144,16 +145,21 @@ where async fn step(&mut self) { tokio::select! { biased; - // fixme: we should probably handle a panic in the checkpointing task somehow - Some(Ok((namespace, result))) = self.join_set.join_next(), if !self.join_set.is_empty() => { - self.checkpointing.remove(&namespace); - if let Err(e) = result { - self.errors += 1; - tracing::error!("error checkpointing ns {namespace}: {e}, rescheduling"); - // reschedule - self.scheduled.insert(namespace); - } else { - self.errors = 0; + result = self.join_set.join_next(), if !self.join_set.is_empty() => { + match result { + Some(Ok((namespace, result))) => { + self.checkpointing.remove(&namespace); + if let Err(e) = result { + self.errors += 1; + tracing::error!("error checkpointing ns {namespace}: {e}, rescheduling"); + // reschedule + self.scheduled.insert(namespace); + } else { + self.errors = 0; + } + } + Some(Err(e)) => panic!("checkpoint task panicked: {e}"), + None => unreachable!("got None, but join set is not empty") } } notified = self.recv.recv(), if !self.shutting_down => { diff --git a/libsql-wal/src/error.rs b/libsql-wal/src/error.rs index 9acda39766..28081d9f51 100644 --- a/libsql-wal/src/error.rs +++ b/libsql-wal/src/error.rs @@ -24,6 +24,9 @@ pub enum Error { InvalidFooterMagic, #[error("invalid db footer version")] InvalidFooterVersion, + + #[error("storage error: {0}")] + Storage(#[from] Box), } impl Into for Error { diff --git a/libsql-wal/src/io/buf.rs b/libsql-wal/src/io/buf.rs index f28718c8e8..ba51018d74 100644 --- a/libsql-wal/src/io/buf.rs +++ b/libsql-wal/src/io/buf.rs @@ -201,6 +201,11 @@ impl ZeroCopyBuf { unsafe { self.inner.assume_init_ref() } } + pub fn get_mut(&mut self) -> &mut T { + assert!(self.is_init()); + unsafe { self.inner.assume_init_mut() } + } + pub fn into_inner(self) -> T { assert!(self.is_init()); unsafe { self.inner.assume_init() } diff --git a/libsql-wal/src/io/compat.rs b/libsql-wal/src/io/compat.rs index 461cbfdc39..78ad7752d6 100644 --- a/libsql-wal/src/io/compat.rs +++ b/libsql-wal/src/io/compat.rs @@ -14,7 +14,7 @@ where R: AsyncRead + Unpin, { let mut dst_offset = 0u64; - let mut buffer = BytesMut::zeroed(4096); + let mut buffer = BytesMut::with_capacity(4096); loop { let n = src.read_buf(&mut buffer).await?; if n == 0 { @@ -22,7 +22,7 @@ where } let (b, ret) = dst.write_all_at_async(buffer, dst_offset).await; ret?; - dst_offset += b.len() as u64; + dst_offset += n as u64; buffer = b; buffer.clear(); } diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index 1dbcbb8cbd..475538b57d 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -58,6 +58,7 @@ pub mod test { use std::path::Path; use std::path::PathBuf; use std::sync::Arc; + use std::time::Duration; use libsql_sys::name::NamespaceName; use libsql_sys::rusqlite::OpenFlags; @@ -172,4 +173,17 @@ pub mod test { } tx.end(); } + + pub async fn wait_current_durable(shared: &SharedWal) { + let current = shared.current.load().next_frame_no().get() - 1; + loop { + { + if *shared.durable_frame_no.lock() >= current { + break; + } + } + + tokio::time::sleep(Duration::from_millis(5)).await; + } + } } diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index fb34224811..628def2cdf 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -3,13 +3,16 @@ use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Instant; use dashmap::DashMap; use libsql_sys::ffi::Sqlite3DbHeader; use parking_lot::{Condvar, Mutex}; use rand::Rng; +use roaring::RoaringBitmap; use tokio::sync::{mpsc, Notify, Semaphore}; use tokio::task::JoinSet; +use tokio_stream::StreamExt; use uuid::Uuid; use zerocopy::{AsBytes, FromZeroes}; @@ -17,7 +20,8 @@ use crate::checkpointer::CheckpointMessage; use crate::error::Result; use crate::io::file::FileExt; use crate::io::{Io, StdIO}; -use crate::replication::storage::StorageReplicator; +use crate::replication::injector::Injector; +use crate::replication::storage::{ReplicateFromStorage as _, StorageReplicator}; use crate::segment::list::SegmentList; use crate::segment::Segment; use crate::segment::{current::CurrentSegment, sealed::SealedSegment}; @@ -96,51 +100,40 @@ where S: Storage>, { #[tracing::instrument(skip_all)] - fn swap_current(&self, shared: &SharedWal, tx: &TxGuard<::File>) -> Result<()> { + fn swap_current( + &self, + shared: &SharedWal, + tx: &dyn TxGuard<::File>, + ) -> Result<()> { assert!(tx.is_commited()); - // at this point we must hold a lock to a commited transaction. - - let current = shared.current.load(); - if current.is_empty() { - return Ok(()); - } - let start_frame_no = current.next_frame_no(); - let path = self - .path - .join(shared.namespace().as_str()) - .join(format!("{}:{start_frame_no:020}.seg", shared.namespace())); + self.swap_current_inner(shared) + } +} - let segment_file = self.io.open(true, true, true, &path)?; - let salt = self.io.with_rng(|rng| rng.gen()); - let new = CurrentSegment::create( - segment_file, - path, - start_frame_no, - current.db_size(), - current.tail().clone(), - salt, - current.log_id(), - )?; - // sealing must the last fallible operation, because we don't want to end up in a situation - // where the current log is sealed and it wasn't swapped. - if let Some(sealed) = current.seal()? { - // todo: pass config override here - let notifier = self.checkpoint_notifier.clone(); - let namespace = shared.namespace().clone(); - let durable_frame_no = shared.durable_frame_no.clone(); - let cb: OnStoreCallback = Box::new(move |fno| { +#[tracing::instrument(skip_all, fields(namespace = namespace.as_str(), start_frame_no = seg.start_frame_no()))] +fn maybe_store_segment( + storage: &S, + notifier: &tokio::sync::mpsc::Sender, + namespace: &NamespaceName, + durable_frame_no: &Arc>, + seg: S::Segment, +) { + if seg.is_storable() { + let cb: OnStoreCallback = Box::new({ + let notifier = notifier.clone(); + let durable_frame_no = durable_frame_no.clone(); + let namespace = namespace.clone(); + move |fno| { Box::pin(async move { update_durable(fno, notifier, durable_frame_no, namespace).await; }) - }); - new.tail().push(sealed.clone()); - self.storage.store(&shared.namespace, sealed, None, cb); - } - - shared.current.swap(Arc::new(new)); - tracing::debug!("current segment swapped"); - - Ok(()) + } + }); + storage.store(namespace, seg, None, cb); + } else { + // segment can be checkpointed right away. + let _ = notifier.blocking_send(CheckpointMessage::Namespace(namespace.clone())); + tracing::debug!("segment marked as not storable; skipping"); } } @@ -231,16 +224,30 @@ where namespace: &NamespaceName, db_path: &Path, ) -> Result>> { + let db_file = self.io.open(false, true, true, db_path)?; + let db_file_len = db_file.len()?; + let header = if db_file_len > 0 { + let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); + db_file.read_exact_at(header.as_bytes_mut(), 0)?; + Some(header) + } else { + None + }; + + let footer = self.try_read_footer(&db_file)?; + + let mut checkpointed_frame_no = footer.map(|f| f.replication_index.get()).unwrap_or(0); + let path = self.path.join(namespace.as_str()); self.io.create_dir_all(&path)?; // TODO: handle that with abstract io let dir = walkdir::WalkDir::new(&path).sort_by_file_name().into_iter(); - // TODO: pass config override here - let max_frame_no = self.storage.durable_frame_no_sync(&namespace, None); - let durable_frame_no = Arc::new(Mutex::new(max_frame_no)); + // we only checkpoint durable frame_no so this is a good first estimate without an actual + // network call. + let durable_frame_no = Arc::new(Mutex::new(checkpointed_frame_no)); - let tail = SegmentList::default(); + let list = SegmentList::default(); for entry in dir { let entry = entry.map_err(|e| e.into_io_error().unwrap())?; if entry @@ -257,37 +264,22 @@ where if let Some(sealed) = SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())? { - let notifier = self.checkpoint_notifier.clone(); - let ns = namespace.clone(); - let durable_frame_no = durable_frame_no.clone(); - let cb: OnStoreCallback = Box::new(move |fno| { - Box::pin(async move { - update_durable(fno, notifier, durable_frame_no, ns).await; - }) - }); - // TODO: pass config override here - self.storage.store(&namespace, sealed.clone(), None, cb); - tail.push(sealed); + list.push(sealed.clone()); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &namespace, + &durable_frame_no, + sealed, + ); } } - let db_file = self.io.open(false, true, true, db_path)?; - let db_file_len = db_file.len()?; - let header = if db_file_len > 0 { - let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed(); - db_file.read_exact_at(header.as_bytes_mut(), 0)?; - Some(header) - } else { - None - }; - - let footer = self.try_read_footer(&db_file)?; - let log_id = match footer { - Some(footer) if tail.is_empty() => footer.log_id(), - None if tail.is_empty() => self.io.uuid(), + Some(footer) if list.is_empty() => footer.log_id(), + None if list.is_empty() => self.io.uuid(), Some(footer) => { - let log_id = tail + let log_id = list .with_head(|h| h.header().log_id.get()) .expect("non-empty list should have a head"); let log_id = Uuid::from_u128(log_id); @@ -295,14 +287,21 @@ where log_id } None => { - let log_id = tail + let log_id = list .with_head(|h| h.header().log_id.get()) .expect("non-empty list should have a head"); Uuid::from_u128(log_id) } }; - let (db_size, next_frame_no) = tail + // if there is a tail, then the latest checkpointed frame_no is one before the the + // start frame_no of the tail. We must read it from the tail, because a partial + // checkpoint may have occured before a crash. + if let Some(last) = list.last() { + checkpointed_frame_no = (last.start_frame_no() - 1).max(1) + } + + let (db_size, next_frame_no) = list .with_head(|segment| { let header = segment.header(); (header.size_after(), header.next_frame_no()) @@ -310,32 +309,23 @@ where .unwrap_or_else(|| match header { Some(header) => ( header.db_size.get(), - NonZeroU64::new(header.replication_index.get() + 1) + NonZeroU64::new(checkpointed_frame_no + 1) .unwrap_or(NonZeroU64::new(1).unwrap()), ), None => (0, NonZeroU64::new(1).unwrap()), }); - let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); + let current_segment_path = path.join(format!("{namespace}:{next_frame_no:020}.seg")); - let segment_file = self.io.open(true, true, true, ¤t_path)?; + let segment_file = self.io.open(true, true, true, ¤t_segment_path)?; let salt = self.io.with_rng(|rng| rng.gen()); - let checkpointed_frame_no = match tail.last() { - // if there is a tail, then the latest checkpointed frame_no is one before the the - // start frame_no of the tail. We must read it from the tail, because a partial - // checkpoint may have occured before a crash. - Some(last) => (last.start_frame_no() - 1).max(1), - // otherwise, we read the it from the footer. - None => footer.map(|f| f.replication_index.get()).unwrap_or(0), - }; - let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create( segment_file, - current_path, + current_segment_path, next_frame_no, db_size, - tail.into(), + list.into(), salt, log_id, )?)); @@ -380,6 +370,44 @@ where } } + /// Attempts to sync all loaded dbs with durable storage + pub async fn sync_all(&self, conccurency: usize) -> Result<()> + where + S: Storage, + { + let mut join_set = JoinSet::new(); + tracing::info!("syncing {} namespaces", self.opened.len()); + // FIXME: arbitrary value, maybe use something like numcpu * 2? + let before_sync = Instant::now(); + let sem = Arc::new(Semaphore::new(conccurency)); + for entry in self.opened.iter() { + let Slot::Wal(shared) = entry.value() else { + panic!("all wals should already be opened") + }; + let storage = self.storage.clone(); + let shared = shared.clone(); + let sem = sem.clone(); + let permit = sem.acquire_owned().await.unwrap(); + + join_set.spawn(async move { + let _permit = permit; + sync_one(shared, storage).await + }); + + if let Some(ret) = join_set.try_join_next() { + ret.unwrap()?; + } + } + + while let Some(ret) = join_set.join_next().await { + ret.unwrap()?; + } + + tracing::info!("synced in {:?}", before_sync.elapsed()); + + Ok(()) + } + // On shutdown, we checkpoint all the WALs. This require sealing the current segment, and when // checkpointing all the segments pub async fn shutdown(self: Arc) -> Result<()> { @@ -437,6 +465,100 @@ where Ok(()) } + + #[tracing::instrument(skip_all)] + fn swap_current_inner(&self, shared: &SharedWal) -> Result<()> { + let current = shared.current.load(); + if current.is_empty() { + return Ok(()); + } + let start_frame_no = current.next_frame_no(); + let path = self + .path + .join(shared.namespace().as_str()) + .join(format!("{}:{start_frame_no:020}.seg", shared.namespace())); + + let segment_file = self.io.open(true, true, true, &path)?; + let salt = self.io.with_rng(|rng| rng.gen()); + let new = CurrentSegment::create( + segment_file, + path, + start_frame_no, + current.db_size(), + current.tail().clone(), + salt, + current.log_id(), + )?; + // sealing must the last fallible operation, because we don't want to end up in a situation + // where the current log is sealed and it wasn't swapped. + if let Some(sealed) = current.seal()? { + new.tail().push(sealed.clone()); + maybe_store_segment( + self.storage.as_ref(), + &self.checkpoint_notifier, + &shared.namespace, + &shared.durable_frame_no, + sealed, + ); + } + + shared.current.swap(Arc::new(new)); + tracing::debug!("current segment swapped"); + + Ok(()) + } +} + +#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))] +async fn sync_one(shared: Arc>, storage: Arc) -> Result<()> +where + IO: Io, + S: Storage, +{ + let remote_durable_frame_no = storage + .durable_frame_no(shared.namespace(), None) + .await + .map_err(Box::new)?; + let local_current_frame_no = shared.current.load().next_frame_no().get() - 1; + + if remote_durable_frame_no > local_current_frame_no { + tracing::info!( + remote_durable_frame_no, + local_current_frame_no, + "remote storage has newer segments" + ); + let mut seen = RoaringBitmap::new(); + let replicator = StorageReplicator::new(storage, shared.namespace().clone()); + let stream = replicator + .stream(&mut seen, remote_durable_frame_no, 1) + .peekable(); + let mut injector = Injector::new(shared.clone(), 10)?; + // we set the durable frame_no before we start injecting, because the wal may want to + // checkpoint on commit. + injector.set_durable(remote_durable_frame_no); + // use pin to the heap so that we can drop the stream in the loop, and count `seen`. + let mut stream = Box::pin(stream); + loop { + match stream.next().await { + Some(Ok(mut frame)) => { + if stream.peek().await.is_none() { + drop(stream); + frame.header_mut().set_size_after(seen.len() as _); + injector.insert_frame(frame).await?; + break; + } else { + injector.insert_frame(frame).await?; + } + } + Some(Err(e)) => todo!("handle error: {e}, {}", shared.namespace()), + None => break, + } + } + } + + tracing::info!("local database is up to date"); + + Ok(()) } fn read_log_id_from_footer(db_file: &F, db_size: u64) -> io::Result { diff --git a/libsql-wal/src/replication/injector.rs b/libsql-wal/src/replication/injector.rs index 3a152b412e..5a9349dc76 100644 --- a/libsql-wal/src/replication/injector.rs +++ b/libsql-wal/src/replication/injector.rs @@ -6,7 +6,7 @@ use crate::error::Result; use crate::io::Io; use crate::segment::Frame; use crate::shared_wal::SharedWal; -use crate::transaction::TxGuardOwned; +use crate::transaction::{Transaction, TxGuardOwned}; /// The injector takes frames and injects them in the wal. pub struct Injector { @@ -15,26 +15,53 @@ pub struct Injector { buffer: Vec>, /// capacity of the frame buffer capacity: usize, - tx: TxGuardOwned, + tx: Option>, max_tx_frame_no: u64, + previous_durable_frame_no: u64, } impl Injector { - pub fn new( - wal: Arc>, - tx: TxGuardOwned, - buffer_capacity: usize, - ) -> Result { + pub fn new(wal: Arc>, buffer_capacity: usize) -> Result { Ok(Self { wal, buffer: Vec::with_capacity(buffer_capacity), capacity: buffer_capacity, - tx, + tx: None, max_tx_frame_no: 0, + previous_durable_frame_no: 0, }) } + pub fn set_durable(&mut self, durable_frame_no: u64) { + let mut old = self.wal.durable_frame_no.lock(); + if *old <= durable_frame_no { + self.previous_durable_frame_no = *old; + *old = durable_frame_no; + } else { + todo!("primary reported older frame_no than current"); + } + } + + pub fn current_durable(&self) -> u64 { + *self.wal.durable_frame_no.lock() + } + + pub fn maybe_begin_txn(&mut self) -> Result<()> { + if self.tx.is_none() { + let mut tx = Transaction::Read(self.wal.begin_read(u64::MAX)); + self.wal.upgrade(&mut tx)?; + let tx = tx + .into_write() + .unwrap_or_else(|_| unreachable!()) + .into_lock_owned(); + assert!(self.tx.replace(tx).is_none()); + } + + Ok(()) + } + pub async fn insert_frame(&mut self, frame: Box) -> Result> { + self.maybe_begin_txn()?; let size_after = frame.size_after(); self.max_tx_frame_no = self.max_tx_frame_no.max(frame.header().frame_no()); self.buffer.push(frame); @@ -47,24 +74,51 @@ impl Injector { } pub async fn flush(&mut self, size_after: Option) -> Result<()> { - let buffer = std::mem::take(&mut self.buffer); - let current = self.wal.current.load(); - let commit_data = size_after.map(|size| (size, self.max_tx_frame_no)); - if commit_data.is_some() { - self.max_tx_frame_no = 0; + if !self.buffer.is_empty() && self.tx.is_some() { + let last_committed_frame_no = self.max_tx_frame_no; + { + let tx = self.tx.as_mut().expect("we just checked that tx was there"); + let buffer = std::mem::take(&mut self.buffer); + let current = self.wal.current.load(); + let commit_data = size_after.map(|size| (size, self.max_tx_frame_no)); + if commit_data.is_some() { + self.max_tx_frame_no = 0; + } + let buffer = current.inject_frames(buffer, commit_data, tx).await?; + self.buffer = buffer; + self.buffer.clear(); + } + + if size_after.is_some() { + let mut tx = self.tx.take().unwrap(); + self.wal + .new_frame_notifier + .send_replace(last_committed_frame_no); + // the strategy to swap the current log is to do it on change of durable boundary, + // when we have caught up with the current durable frame_no + if self.current_durable() != self.previous_durable_frame_no + && self.current_durable() >= self.max_tx_frame_no + { + let wal = self.wal.clone(); + // FIXME: tokio dependency here is annoying, we need an async version of swap_current. + tokio::task::spawn_blocking(move || { + tx.commit(); + wal.swap_current(&tx) + }) + .await + .unwrap()? + } + } } - let buffer = current - .inject_frames(buffer, commit_data, &mut self.tx) - .await?; - self.buffer = buffer; - self.buffer.clear(); Ok(()) } pub fn rollback(&mut self) { self.buffer.clear(); - self.tx.reset(0); + if let Some(tx) = self.tx.as_mut() { + tx.reset(0); + } } } @@ -93,13 +147,7 @@ mod test { let replica_conn = replica_env.open_conn("test"); let replica_shared = replica_env.shared("test"); - let mut tx = crate::transaction::Transaction::Read(replica_shared.begin_read(42)); - replica_shared.upgrade(&mut tx).unwrap(); - let guard = tx - .into_write() - .unwrap_or_else(|_| panic!()) - .into_lock_owned(); - let mut injector = Injector::new(replica_shared.clone(), guard, 10).unwrap(); + let mut injector = Injector::new(replica_shared.clone(), 10).unwrap(); primary_conn.execute("create table test (x)", ()).unwrap(); diff --git a/libsql-wal/src/replication/replicator.rs b/libsql-wal/src/replication/replicator.rs index 4e5850736e..447ae169c2 100644 --- a/libsql-wal/src/replication/replicator.rs +++ b/libsql-wal/src/replication/replicator.rs @@ -40,17 +40,21 @@ impl Replicator { /// /// In a single replication step, the replicator guarantees that a minimal set of frames is /// sent to the replica. + #[tracing::instrument(skip(self))] pub fn into_frame_stream(mut self) -> impl Stream>> + Send { async_stream::try_stream! { loop { // First we decide up to what frame_no we want to replicate in this step. If we are // already up to date, wait for something to happen + tracing::debug!(next_frame_no = self.next_frame_no); let most_recent_frame_no = *self .new_frame_notifier .wait_for(|fno| *fno >= self.next_frame_no) .await .expect("channel cannot be closed because we hold a ref to the sending end"); + tracing::debug!(most_recent_frame_no, "new frame_no available"); + let mut commit_frame_no = 0; // we have stuff to replicate if most_recent_frame_no >= self.next_frame_no { @@ -66,6 +70,7 @@ impl Replicator { let mut stream = stream.peekable(); + tracing::debug!(replicated_until, "replicating from current log"); loop { let Some(frame) = stream.next().await else { break }; let mut frame = frame.map_err(|e| Error::CurrentSegment(e.into()))?; @@ -88,6 +93,7 @@ impl Replicator { .stream_pages_from(replicated_until, self.next_frame_no, &mut seen).await; tokio::pin!(stream); + tracing::debug!(replicated_until, "replicating from tail"); let mut stream = stream.peekable(); let should_replicate_from_storage = replicated_until != self.next_frame_no; @@ -110,6 +116,7 @@ impl Replicator { // Replicating from sealed segments was not enough, so we replicate from // durable storage if let Some(replicated_until) = replicated_until { + tracing::debug!("replicating from durable storage"); let stream = self .shared .stored_segments diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index 35ea89fb09..ae3c13ac97 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -63,6 +63,7 @@ where let (frame, ret) = segment.read_frame(Frame::new_box_zeroed(), offset as u32).await; ret?; + debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0"); if frame.header().frame_no() >= until { yield frame; } diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index ae0bb02a62..689964f2b4 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -39,6 +39,10 @@ impl CompactedSegmentDataHeader { Ok(()) } + + pub fn size_after(&self) -> u32 { + self.size_after.get() + } } #[derive(Debug, AsBytes, FromZeroes, FromBytes)] @@ -62,6 +66,10 @@ impl CompactedSegment { file: f(self.file), } } + + pub fn header(&self) -> &CompactedSegmentDataHeader { + &self.header + } } impl CompactedSegment { diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 7b4147fc68..9b3b4ce5ac 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -23,7 +23,7 @@ use crate::io::file::FileExt; use crate::io::Inspect; use crate::segment::{checked_frame_offset, SegmentFlags}; use crate::segment::{frame_offset, page_offset, sealed::SealedSegment}; -use crate::transaction::{Transaction, TxGuard, TxGuardOwned}; +use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared}; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::list::SegmentList; @@ -73,6 +73,7 @@ impl CurrentSegment { salt: salt.into(), page_size: LIBSQL_PAGE_SIZE.into(), log_id: log_id.as_u128().into(), + frame_count: 0.into(), }; header.recompute_checksum(); @@ -96,7 +97,7 @@ impl CurrentSegment { } pub fn is_empty(&self) -> bool { - self.count_committed() == 0 + self.header.lock().is_empty() } pub fn with_header(&self, f: impl FnOnce(&SegmentHeader) -> R) -> R { @@ -113,7 +114,7 @@ impl CurrentSegment { } pub fn count_committed(&self) -> usize { - self.header.lock().count_committed() + self.header.lock().frame_count() } pub fn db_size(&self) -> u32 { @@ -192,6 +193,12 @@ impl CurrentSegment { // set frames unordered because there are no guarantees that we received frames // in order. header.set_flags(header.flags().union(SegmentFlags::FRAME_UNORDERED)); + { + let savepoint = tx.savepoints.first().unwrap(); + header.frame_count = (header.frame_count.get() + + (tx.next_offset - savepoint.next_offset) as u64) + .into(); + } header.recompute_checksum(); let (header, ret) = self @@ -224,7 +231,7 @@ impl CurrentSegment { &self, pages: impl Iterator, size_after: Option, - tx: &mut TxGuard, + tx: &mut TxGuardShared, ) -> Result> where F: FileExt, @@ -298,6 +305,7 @@ impl CurrentSegment { } } + // commit if let Some(size_after) = size_after { if tx.not_empty() { let new_checksum = if let Some(offset) = tx.recompute_checksum { @@ -325,6 +333,13 @@ impl CurrentSegment { let mut header = { *self.header.lock() }; header.last_commited_frame_no = last_frame_no.into(); header.size_after = size_after.into(); + // count how many frames were appeneded: basically last appeneded offset - initial + // offset + let tx = tx.deref_mut(); + let savepoint = tx.savepoints.first().unwrap(); + header.frame_count = (header.frame_count.get() + + (tx.next_offset - savepoint.next_offset) as u64) + .into(); header.recompute_checksum(); self.file.write_all_at(header.as_bytes(), 0)?; @@ -398,7 +413,7 @@ impl CurrentSegment { F: FileExt, { let mut header = self.header.lock(); - let index_offset = header.count_committed() as u32; + let index_offset = header.frame_count() as u32; let index_byte_offset = checked_frame_offset(index_offset); let mut cursor = self.file.cursor(index_byte_offset); let writer = BufWriter::new(&mut cursor); diff --git a/libsql-wal/src/segment/list.rs b/libsql-wal/src/segment/list.rs index 1708967bba..b50622d3ef 100644 --- a/libsql-wal/src/segment/list.rs +++ b/libsql-wal/src/segment/list.rs @@ -78,6 +78,7 @@ where /// Checkpoints as many segments as possible to the main db file, and return the checkpointed /// frame_no, if anything was checkpointed + #[tracing::instrument(skip_all)] pub async fn checkpoint( &self, db_file: &IO::File, @@ -108,6 +109,10 @@ where // readers pointing to them while let Some(segment) = &*current { // skip any segment more recent than until_frame_no + tracing::debug!( + last_committed = segment.last_committed(), + until = until_frame_no + ); if segment.last_committed() <= until_frame_no { if !segment.is_checkpointable() { segs.clear(); @@ -120,6 +125,7 @@ where // nothing to checkpoint rn if segs.is_empty() { + tracing::debug!("nothing to checkpoint"); return Ok(None); } @@ -133,6 +139,7 @@ where let mut last_replication_index = 0; while let Some((k, v)) = union.next() { let page_no = u32::from_be_bytes(k.try_into().unwrap()); + tracing::trace!(page_no); let v = v.iter().min_by_key(|i| i.index).unwrap(); let offset = v.value as u32; @@ -194,6 +201,8 @@ where self.len.fetch_sub(segs.len(), Ordering::Relaxed); + tracing::debug!(until = last_replication_index, "checkpointed"); + Ok(Some(last_replication_index)) } diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index 2882efaaba..a3f6d56441 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -50,6 +50,8 @@ pub struct SegmentHeader { pub version: U16, pub start_frame_no: U64, pub last_commited_frame_no: U64, + /// number of frames in the segment + pub frame_count: U64, /// size of the database in pages, after applying the segment. pub size_after: U32, /// byte offset of the index. If 0, then the index wasn't written, and must be recovered. @@ -97,7 +99,7 @@ impl SegmentHeader { } } - fn flags(&self) -> SegmentFlags { + pub fn flags(&self) -> SegmentFlags { SegmentFlags::from_bits(self.flags.get()).unwrap() } @@ -120,14 +122,11 @@ impl SegmentHeader { } fn is_empty(&self) -> bool { - self.last_commited_frame_no.get() == 0 + self.frame_count() == 0 } - fn count_committed(&self) -> usize { - self.last_commited_frame_no - .get() - .checked_sub(self.start_frame_no.get() - 1) - .unwrap_or(0) as usize + pub fn frame_count(&self) -> usize { + self.frame_count.get() as usize } pub fn last_committed(&self) -> u64 { @@ -160,6 +159,7 @@ pub trait Segment: Send + Sync + 'static { fn start_frame_no(&self) -> u64; fn last_committed(&self) -> u64; fn index(&self) -> &fst::Map>; + fn is_storable(&self) -> bool; fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result; /// returns the number of readers currently holding a reference to this log. /// The read count must monotonically decrease. @@ -216,6 +216,10 @@ impl Segment for Arc { fn destroy(&self, io: &IO) -> impl Future { self.as_ref().destroy(io) } + + fn is_storable(&self) -> bool { + self.as_ref().is_storable() + } } #[repr(C)] diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index ddc71f4168..f5f3b244a8 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -108,8 +108,12 @@ where let mut current_offset = 0; while let Some((page_no_bytes, offset)) = pages.next() { - let (b, ret) = self.read_frame_offset_async(offset as _, buffer).await; - ret.unwrap(); + let (mut b, ret) = self.read_frame_offset_async(offset as _, buffer).await; + ret?; + // transaction boundaries in a segment are completely erased. The responsibility is on + // the consumer of the segment to place the transaction boundary such that all frames from + // the segment are applied within the same transaction. + b.get_mut().header_mut().set_size_after(0); hasher.update(&b.get_ref().as_bytes()); let dest_offset = size_of::() + current_offset * size_of::(); @@ -177,7 +181,9 @@ where } fn is_checkpointable(&self) -> bool { - self.read_locks.load(Ordering::Relaxed) == 0 + let read_locks = self.read_locks.load(Ordering::Relaxed); + tracing::debug!(read_locks); + read_locks == 0 } fn size_after(&self) -> u32 { @@ -191,6 +197,17 @@ where } } } + + fn is_storable(&self) -> bool { + // we don't store unordered segments, since they only happen in two cases: + // - in a replica: no need for storage + // - in a primary, on recovery from storage: we don't want to override remote + // segment. + !self + .header() + .flags() + .contains(SegmentFlags::FRAME_UNORDERED) + } } impl SealedSegment { @@ -211,7 +228,7 @@ impl SealedSegment { // This happens in case of crash: the segment is not empty, but it wasn't sealed. We need to // recover the index, and seal the segment. if !header.flags().contains(SegmentFlags::SEALED) { - assert_eq!(header.index_offset.get(), 0); + assert_eq!(header.index_offset.get(), 0, "{header:?}"); return Self::recover(file, path, header).map(Some); } @@ -235,8 +252,6 @@ impl SealedSegment { assert_eq!(header.index_size.get(), 0); assert_eq!(header.index_offset.get(), 0); assert!(!header.flags().contains(SegmentFlags::SEALED)); - // recovery for replica log should take a different path (i.e: resync with primary) - assert!(!header.flags().contains(SegmentFlags::FRAME_UNORDERED)); let mut current_checksum = header.salt.get(); tracing::trace!("recovering unsealed segment at {path:?}"); @@ -246,6 +261,11 @@ impl SealedSegment { let mut last_committed = 0; let mut size_after = 0; let mut frame_count = 0; + // When the segment is ordered, then the biggest frame_no is the last commited + // frame. This is not the case for an unordered segment (in case of recovery or + // a replica), so we track the biggest frame_no and set last_commited to that + // value on a commit frame + let mut max_seen_frame_no = 0; for i in 0.. { let offset = checked_frame_offset(i as u32); match file.read_exact_at(frame.as_bytes_mut(), offset) { @@ -263,9 +283,19 @@ impl SealedSegment { current_checksum = new_checksum; frame_count += 1; + // this must always hold for a ordered segment. + #[cfg(debug_assertions)] + { + if !header.flags().contains(SegmentFlags::FRAME_UNORDERED) { + assert!(frame.frame.header().frame_no() > max_seen_frame_no); + } + } + + max_seen_frame_no = max_seen_frame_no.max(frame.frame.header.frame_no()); + current_tx.push(frame.frame.header().page_no()); if frame.frame.header.is_commit() { - last_committed = frame.frame.header().frame_no(); + last_committed = max_seen_frame_no; size_after = frame.frame.header().size_after(); let base_offset = (i + 1) - current_tx.len(); for (frame_offset, page_no) in current_tx.drain(..).enumerate() { @@ -304,6 +334,8 @@ impl SealedSegment { header.index_size = index_size.into(); header.last_commited_frame_no = last_committed.into(); header.size_after = size_after.into(); + let flags = header.flags(); + header.set_flags(flags | SegmentFlags::SEALED); header.recompute_checksum(); file.write_all_at(header.as_bytes(), 0)?; let index = Map::new(index_bytes.into()).unwrap(); diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index 74b609cdee..d25b807dc0 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -35,7 +35,7 @@ pub struct WalLock { } pub(crate) trait SwapLog: Sync + Send + 'static { - fn swap_current(&self, shared: &SharedWal, tx: &TxGuard) -> Result<()>; + fn swap_current(&self, shared: &SharedWal, tx: &dyn TxGuard) -> Result<()>; } pub struct SharedWal { @@ -96,6 +96,10 @@ impl SharedWal { self.current.load().log_id() } + pub fn durable_frame_no(&self) -> u64 { + *self.durable_frame_no.lock() + } + #[tracing::instrument(skip_all)] pub fn begin_read(&self, conn_id: u64) -> ReadTransaction { // FIXME: this is not enough to just increment the counter, we must make sure that the segment @@ -270,7 +274,6 @@ impl SharedWal { self.new_frame_notifier.send_replace(last_committed); } - // TODO: use config for max log size if tx.is_commited() && current.count_committed() > self.max_segment_size.load(Ordering::Relaxed) { @@ -284,22 +287,28 @@ impl SharedWal { pub fn seal_current(&self) -> Result<()> { let mut tx = self.begin_read(u64::MAX).into(); self.upgrade(&mut tx)?; - { + + let ret = { let mut guard = tx.as_write_mut().unwrap().lock(); guard.commit(); - self.swap_current(&mut guard)?; - } + self.swap_current(&mut guard) + }; + // make sure the tx is always ended before it's dropped! + // FIXME: this is an issue with this design, since downgrade consume self, we can't have a + // drop implementation. The should probably have a Option, to that we can + // take &mut Self instead. tx.end(); - Ok(()) + ret } /// Swap the current log. A write lock must be held, but the transaction must be must be committed already. - pub(crate) fn swap_current(&self, tx: &TxGuard) -> Result<()> { + pub(crate) fn swap_current(&self, tx: &impl TxGuard) -> Result<()> { self.registry.swap_current(self, tx)?; Ok(()) } + #[tracing::instrument(skip(self))] pub async fn checkpoint(&self) -> Result> { let durable_frame_no = *self.durable_frame_no.lock(); let checkpointed_frame_no = self diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index d679fa1e2a..843ca6c920 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -205,10 +205,10 @@ where &self, namespace: &NamespaceName, config_override: Option, - ) -> u64 { + ) -> super::Result { let config = config_override.unwrap_or_else(|| self.backend.default_config()); - let meta = self.backend.meta(&config, namespace).await.unwrap(); - meta.max_frame_no + let meta = self.backend.meta(&config, namespace).await?; + Ok(meta.max_frame_no) } async fn restore( @@ -224,15 +224,6 @@ where .await } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> u64 { - tokio::runtime::Handle::current() - .block_on(self.durable_frame_no(namespace, config_override)) - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 61465df242..66aff1f2da 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -198,12 +198,14 @@ impl S3Backend { folder_key: &FolderKey<'_>, frame_no: u64, ) -> Result> { + let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key); let lookup_key = s3_segment_index_lookup_key(&folder_key, frame_no); let objects = self .client .list_objects_v2() .bucket(&config.bucket) + .prefix(lookup_key_prefix) .start_after(lookup_key) .send() .await @@ -214,15 +216,10 @@ impl S3Backend { }; let key = contents.key().expect("misssing key?"); let key_path: &Path = key.as_ref(); - let segment_key: SegmentKey = key_path - .file_stem() - .expect("invalid key") - .to_str() - .expect("invalid key") - .parse() - .expect("invalid key"); - - Ok(Some(segment_key)) + + let key = SegmentKey::validate_from_path(key_path, &folder_key.namespace); + + Ok(key) } // This method could probably be optimized a lot by using indexes and only downloading useful @@ -335,8 +332,12 @@ fn s3_segment_index_key(folder_key: &FolderKey, segment_key: &SegmentKey) -> Str format!("{folder_key}/indexes/{segment_key}") } +fn s3_segment_index_lookup_key_prefix(folder_key: &FolderKey) -> String { + format!("{folder_key}/indexes/") +} + fn s3_segment_index_lookup_key(folder_key: &FolderKey, frame_no: u64) -> String { - format!("{folder_key}/indexes/{:019}", u64::MAX - frame_no) + format!("{folder_key}/indexes/{:020}", u64::MAX - frame_no) } impl Backend for S3Backend diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index 07f5aa0cd6..797170644c 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -423,6 +423,10 @@ mod test { fn destroy(&self, _io: &IO) -> impl std::future::Future { async move { todo!() } } + + fn is_storable(&self) -> bool { + true + } } struct TestBackend; diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index d20e8bb4a1..56ed5feff8 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::fmt; use std::future::Future; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; @@ -52,12 +52,12 @@ pub enum RestoreOptions { /// let meta = SegmentMeta { start_frame_no: 101, end_frame_no: 1000 }; /// map.insert(SegmentKey(&meta).to_string(), meta); /// -/// map.range(format!("{:019}", u64::MAX - 50)..).next(); -/// map.range(format!("{:019}", u64::MAX - 0)..).next(); -/// map.range(format!("{:019}", u64::MAX - 1)..).next(); -/// map.range(format!("{:019}", u64::MAX - 100)..).next(); -/// map.range(format!("{:019}", u64::MAX - 101)..).next(); -/// map.range(format!("{:019}", u64::MAX - 5000)..).next(); +/// map.range(format!("{:020}", u64::MAX - 50)..).next(); +/// map.range(format!("{:020}", u64::MAX - 0)..).next(); +/// map.range(format!("{:020}", u64::MAX - 1)..).next(); +/// map.range(format!("{:020}", u64::MAX - 100)..).next(); +/// map.range(format!("{:020}", u64::MAX - 101)..).next(); +/// map.range(format!("{:020}", u64::MAX - 5000)..).next(); /// ``` #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SegmentKey { @@ -85,6 +85,28 @@ impl SegmentKey { pub(crate) fn includes(&self, frame_no: u64) -> bool { (self.start_frame_no..=self.end_frame_no).contains(&frame_no) } + + #[tracing::instrument] + fn validate_from_path(mut path: &Path, ns: &NamespaceName) -> Option { + // path in the form "v2/clusters/{cluster-id}/namespaces/{namespace}/indexes/{index-key}" + let key: Self = path.file_name()?.to_str()?.parse().ok()?; + + path = path.parent()?; + + if path.file_name()? != "indexes" { + tracing::debug!("invalid key, ignoring"); + return None; + } + + path = path.parent()?; + + if path.file_name()? != ns.as_str() { + tracing::debug!("invalid namespace for key"); + return None; + } + + Some(key) + } } impl From<&SegmentMeta> for SegmentKey { @@ -115,7 +137,7 @@ impl fmt::Display for SegmentKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{:019}-{:019}", + "{:020}-{:020}", u64::MAX - self.start_frame_no, u64::MAX - self.end_frame_no, ) @@ -145,17 +167,11 @@ pub trait Storage: Send + Sync + 'static { on_store: OnStoreCallback, ); - fn durable_frame_no_sync( + fn durable_frame_no( &self, namespace: &NamespaceName, config_override: Option, - ) -> u64; - - async fn durable_frame_no( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> u64; + ) -> impl Future> + Send; async fn restore( &self, @@ -227,22 +243,11 @@ where } } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - config_override: Option, - ) -> u64 { - match zip(self, config_override) { - Either::A((s, c)) => s.durable_frame_no_sync(namespace, c), - Either::B((s, c)) => s.durable_frame_no_sync(namespace, c), - } - } - async fn durable_frame_no( &self, namespace: &NamespaceName, config_override: Option, - ) -> u64 { + ) -> Result { match zip(self, config_override) { Either::A((s, c)) => s.durable_frame_no(namespace, c).await, Either::B((s, c)) => s.durable_frame_no(namespace, c).await, @@ -339,10 +344,10 @@ impl Storage for NoStorage { async fn durable_frame_no( &self, - namespace: &NamespaceName, - config: Option, - ) -> u64 { - self.durable_frame_no_sync(namespace, config) + _namespace: &NamespaceName, + _config: Option, + ) -> Result { + Ok(u64::MAX) } async fn restore( @@ -355,14 +360,6 @@ impl Storage for NoStorage { panic!("can restore from no storage") } - fn durable_frame_no_sync( - &self, - _namespace: &NamespaceName, - _config_override: Option, - ) -> u64 { - u64::MAX - } - async fn find_segment( &self, _namespace: &NamespaceName, @@ -473,15 +470,21 @@ impl Storage for TestStorage { .or_default() .insert(key, (out_path, index)); tokio::runtime::Handle::current().block_on(on_store(end_frame_no)); + } else { + // HACK: we need to spawn because many tests just call this method indirectly in + // async context. That makes tests easier to write. + tokio::task::spawn_blocking(move || { + tokio::runtime::Handle::current().block_on(on_store(u64::MAX)); + }); } } async fn durable_frame_no( &self, - namespace: &NamespaceName, - config: Option, - ) -> u64 { - self.durable_frame_no_sync(namespace, config) + _namespace: &NamespaceName, + _config: Option, + ) -> Result { + Ok(u64::MAX) } async fn restore( @@ -494,22 +497,6 @@ impl Storage for TestStorage { todo!(); } - fn durable_frame_no_sync( - &self, - namespace: &NamespaceName, - _config_override: Option, - ) -> u64 { - let inner = self.inner.lock_blocking(); - if inner.store { - let Some(segs) = inner.stored.get(namespace) else { - return 0; - }; - segs.keys().map(|k| k.end_frame_no).max().unwrap_or(0) - } else { - u64::MAX - } - } - async fn find_segment( &self, namespace: &NamespaceName, diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 2fa593bade..93fb3fe16d 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -155,14 +155,23 @@ pub struct WriteTransaction { } pub struct TxGuardOwned { - _lock: Option>>, + lock: Option>>, inner: Option>, } +impl TxGuardOwned { + pub(crate) fn into_inner(mut self) -> WriteTransaction { + self.lock.take(); + self.inner.take().unwrap() + } +} + impl Drop for TxGuardOwned { fn drop(&mut self) { - let _ = self._lock.take(); - self.inner.take().expect("already dropped").downgrade(); + let _ = self.lock.take(); + if let Some(inner) = self.inner.take() { + inner.downgrade(); + } } } @@ -180,12 +189,17 @@ impl DerefMut for TxGuardOwned { } } -pub struct TxGuard<'a, F> { +pub trait TxGuard: Deref> + DerefMut + Send + Sync {} + +impl<'a, F: Send + Sync> TxGuard for TxGuardShared<'a, F> {} +impl TxGuard for TxGuardOwned {} + +pub struct TxGuardShared<'a, F> { _lock: async_lock::MutexGuardArc>, inner: &'a mut WriteTransaction, } -impl<'a, F> Deref for TxGuard<'a, F> { +impl<'a, F> Deref for TxGuardShared<'a, F> { type Target = WriteTransaction; fn deref(&self) -> &Self::Target { @@ -193,7 +207,7 @@ impl<'a, F> Deref for TxGuard<'a, F> { } } -impl<'a, F> DerefMut for TxGuard<'a, F> { +impl<'a, F> DerefMut for TxGuardShared<'a, F> { fn deref_mut(&mut self) -> &mut Self::Target { self.inner } @@ -216,16 +230,11 @@ impl WriteTransaction { savepoint_id } - pub fn lock(&mut self) -> TxGuard { - if self.is_commited { - tracing::error!("transaction already commited"); - todo!("txn has already been commited"); - } - + pub fn lock(&mut self) -> TxGuardShared { let g = self.wal_lock.tx_id.lock_arc_blocking(); match *g { // we still hold the lock, we can proceed - Some(id) if self.id == id => TxGuard { + Some(id) if self.id == id => TxGuardShared { _lock: g, inner: self, }, @@ -236,16 +245,11 @@ impl WriteTransaction { } pub fn into_lock_owned(self) -> TxGuardOwned { - if self.is_commited { - tracing::error!("transaction already commited"); - todo!("txn has already been commited"); - } - let g = self.wal_lock.tx_id.lock_arc_blocking(); match *g { // we still hold the lock, we can proceed Some(id) if self.id == id => TxGuardOwned { - _lock: Some(g), + lock: Some(g), inner: Some(self), }, // Somebody took the lock from us @@ -314,7 +318,7 @@ impl WriteTransaction { } } - tracing::debug!(id = read_tx.id, "lock released"); + tracing::trace!(id = read_tx.id, "lock released"); read_tx } diff --git a/libsql-wal/src/wal.rs b/libsql-wal/src/wal.rs index 4cbf81b10e..12dd75c7b0 100644 --- a/libsql-wal/src/wal.rs +++ b/libsql-wal/src/wal.rs @@ -131,7 +131,7 @@ impl Wal for LibsqlWal { self.last_read_frame_no = Some(tx.max_frame_no); self.tx = Some(Transaction::Read(tx)); - tracing::debug!(invalidate_cache, "read started"); + tracing::trace!(invalidate_cache, "read started"); Ok(invalidate_cache) } @@ -182,9 +182,9 @@ impl Wal for LibsqlWal { match self.tx.as_mut() { Some(tx) => { self.shared.upgrade(tx).map_err(Into::into)?; - tracing::debug!("write lock acquired"); + tracing::trace!("write lock acquired"); } - None => todo!("should acquire read txn first"), + None => panic!("should acquire read txn first"), } Ok(()) diff --git a/libsql-wal/tests/oracle.rs b/libsql-wal/tests/oracle.rs index 5752c307ef..357b835900 100644 --- a/libsql-wal/tests/oracle.rs +++ b/libsql-wal/tests/oracle.rs @@ -15,7 +15,7 @@ use libsql_sys::wal::{Sqlite3WalManager, Wal}; use libsql_sys::Connection; use libsql_wal::registry::WalRegistry; use libsql_wal::storage::TestStorage; -use libsql_wal::test::seal_current_segment; +use libsql_wal::test::{seal_current_segment, wait_current_durable}; use libsql_wal::wal::LibsqlWalManager; use once_cell::sync::Lazy; use rand::Rng; @@ -130,6 +130,7 @@ async fn run_test_sample(path: &Path) -> Result { let shared = registry.clone().open(&db_path, &"test".into()).unwrap(); seal_current_segment(&shared); + wait_current_durable(&shared).await; shared.checkpoint().await.unwrap(); std::env::set_current_dir(curdir).unwrap(); diff --git a/libsql/src/replication/local_client.rs b/libsql/src/replication/local_client.rs index d3c713f530..421210fff0 100644 --- a/libsql/src/replication/local_client.rs +++ b/libsql/src/replication/local_client.rs @@ -47,7 +47,7 @@ impl ReplicatorClient for LocalClient { async fn next_frames(&mut self) -> Result { match self.frames.take() { Some(Frames::Vec(f)) => { - let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None }).map(Ok); + let iter = f.into_iter().map(|f| RpcFrame { data: f.bytes(), timestamp: None, durable_frame_no: None }).map(Ok); Ok(Box::pin(tokio_stream::iter(iter))) } Some(f @ Frames::Snapshot(_)) => { @@ -72,7 +72,7 @@ impl ReplicatorClient for LocalClient { next.header_mut().size_after = size_after.into(); } let frame = Frame::from(next); - yield RpcFrame { data: frame.bytes(), timestamp: None }; + yield RpcFrame { data: frame.bytes(), timestamp: None, durable_frame_no: None }; } };