Skip to content

Commit

Permalink
Merge pull request #1711 from tursodatabase/libsql-wal-db-restore
Browse files Browse the repository at this point in the history
libsql-wal: Sync dbs from remote storage on startup
  • Loading branch information
MarinPostma authored Sep 4, 2024
2 parents 8208b4f + fb082ac commit d9f7a46
Show file tree
Hide file tree
Showing 38 changed files with 647 additions and 311 deletions.
1 change: 1 addition & 0 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ impl Replicator {
let frame = RpcFrame {
data: frame_to_inject.bytes(),
timestamp: None,
durable_frame_no: None,
};
injector.inject_frame(frame).await?;
applied_wal_frame = true;
Expand Down
1 change: 1 addition & 0 deletions libsql-replication/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message Frame {
// if this frames is a commit frame, then this can be set
// to the time when the transaction was commited
optional int64 timestamp = 2;
optional uint64 durable_frame_no = 3;
}

message Frames {
Expand Down
2 changes: 2 additions & 0 deletions libsql-replication/src/generated/wal_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct Frame {
/// to the time when the transaction was commited
#[prost(int64, optional, tag = "2")]
pub timestamp: ::core::option::Option<i64>,
#[prost(uint64, optional, tag = "3")]
pub durable_frame_no: ::core::option::Option<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
4 changes: 4 additions & 0 deletions libsql-replication/src/injector/libsql_injector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ impl super::Injector for LibsqlInjector {
.map_err(|e| Error::FatalInjectError(e.into()))?;
Ok(None)
}

fn durable_frame_no(&mut self, frame_no: u64) {
self.injector.set_durable(frame_no);
}
}
2 changes: 2 additions & 0 deletions libsql-replication/src/injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ pub trait Injector {
/// Trigger a dummy write, and flush the cache to trigger a call to xFrame. The buffer's frame
/// are then injected into the wal.
fn flush(&mut self) -> impl Future<Output = Result<Option<FrameNo>>> + Send;

fn durable_frame_no(&mut self, frame_no: u64);
}
3 changes: 3 additions & 0 deletions libsql-replication/src/injector/sqlite_injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ impl Injector for SqliteInjector {
let inner = self.inner.clone();
spawn_blocking(move || inner.lock().flush()).await.unwrap()
}

#[inline]
fn durable_frame_no(&mut self, _frame_no: u64) {}
}

impl SqliteInjector {
Expand Down
6 changes: 6 additions & 0 deletions libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ where
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<(), Error> {
self.frames_synced += 1;

if let Some(frame_no) = frame.durable_frame_no {
self.injector.durable_frame_no(frame_no);
}

match self.injector.inject_frame(frame).await? {
Some(commit_fno) => {
self.client.commit_frame_no(commit_fno).await?;
Expand Down Expand Up @@ -772,6 +776,7 @@ mod test {
.map(|f| RpcFrame {
data: f.bytes(),
timestamp: None,
durable_frame_no: None,
})
.take(2)
.map(Ok)
Expand All @@ -785,6 +790,7 @@ mod test {
.map(|f| RpcFrame {
data: f.bytes(),
timestamp: None,
durable_frame_no: None,
})
.map(Ok)
.collect::<Vec<_>>();
Expand Down
8 changes: 1 addition & 7 deletions libsql-server/src/bottomless_migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,7 @@ async fn migrate_one(
.await
.unwrap()?;

let mut tx = shared.begin_read(0).into();
shared.upgrade(&mut tx).unwrap();
let guard = tx
.into_write()
.unwrap_or_else(|_| panic!("should be a write txn"))
.into_lock_owned();
let mut injector = Injector::new(shared.clone(), guard, 10)?;
let mut injector = Injector::new(shared.clone(), 10)?;
let orig_db_path = base_path
.join("dbs")
.join(config.namespace().as_str())
Expand Down
77 changes: 47 additions & 30 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpCo
pub connector: Option<D>,
pub migrate_bottomless: bool,
pub enable_deadlock_monitor: bool,
pub should_sync_from_storage: bool,
pub force_load_wals: bool,
pub sync_conccurency: usize,
}

impl<C, A, D> Default for Server<C, A, D> {
Expand All @@ -203,6 +206,9 @@ impl<C, A, D> Default for Server<C, A, D> {
connector: None,
migrate_bottomless: false,
enable_deadlock_monitor: false,
should_sync_from_storage: false,
force_load_wals: false,
sync_conccurency: 8,
}
}
}
Expand Down Expand Up @@ -781,10 +787,10 @@ where
tokio::select! {
_ = shutdown.notified() => {
let shutdown = async {
namespace_store.shutdown().await?;
task_manager.shutdown().await?;
// join_set.shutdown().await;
service_shutdown.notify_waiters();
namespace_store.shutdown().await?;

Ok::<_, crate::Error>(())
};
Expand Down Expand Up @@ -958,19 +964,30 @@ where
Ok(())
});

// If we have performed the migration, load all shared wals to force flush to storage with
// the new registry
if did_migrate {
// If we performed a migration from bottomless to libsql-wal earlier, then we need to
// forecefully load all the wals, to trigger segment storage with the actual storage. This
// is because migration didn't actually send anything to storage, but just created the
// segments.
if did_migrate || self.should_sync_from_storage || self.force_load_wals {
// eagerly load all namespaces, then call sync_all on the registry
// TODO: do conccurently
let dbs_path = base_config.base_path.join("dbs");
let stream = meta_store.namespaces();
tokio::pin!(stream);
while let Some(conf) = stream.next().await {
let registry = registry.clone();
let namespace = conf.namespace().clone();
let path = dbs_path.join(namespace.as_str()).join("data");
tokio::task::spawn_blocking(move || registry.open(&path, &namespace.into()))
.await
.unwrap()?;
let path = dbs_path.join(namespace.as_str());
tokio::fs::create_dir_all(&path).await?;
tokio::task::spawn_blocking(move || {
registry.open(&path.join("data"), &namespace.into())
})
.await
.unwrap()?;
}

if self.should_sync_from_storage {
registry.sync_all(self.sync_conccurency).await?;
}
}

Expand Down Expand Up @@ -1236,31 +1253,31 @@ where
base_config: &BaseNamespaceConfig,
primary_config: &PrimaryConfig,
) -> anyhow::Result<bool> {
let is_previous_migration_successful = self.check_previous_migration_success()?;
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
let is_primary = self.rpc_client_config.is_none();
let should_attempt_migration = self.migrate_bottomless
&& is_primary
&& is_bottomless_enabled
&& !is_previous_migration_successful
&& is_libsql_wal;

if should_attempt_migration {
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
Ok(true)
} else {
// the wals directory is present and so is the _dbs. This means that a crash occured
// before we could remove it. clean it up now. see code in `bottomless_migrate.rs`
let tmp_dbs_path = base_config.base_path.join("_dbs");
if tmp_dbs_path.try_exists()? {
tracing::info!("removed dangling `_dbs` folder");
tokio::fs::remove_dir_all(&tmp_dbs_path).await?;
}
if self.migrate_bottomless && is_primary {
let is_previous_migration_successful = self.check_previous_migration_success()?;
let is_libsql_wal = matches!(self.use_custom_wal, Some(CustomWAL::LibsqlWal));
let is_bottomless_enabled = self.db_config.bottomless_replication.is_some();
let should_attempt_migration =
is_bottomless_enabled && !is_previous_migration_successful && is_libsql_wal;

if should_attempt_migration {
bottomless_migrate(meta_store, base_config.clone(), primary_config.clone()).await?;
return Ok(true);
} else {
// the wals directory is present and so is the _dbs. This means that a crash occured
// before we could remove it. clean it up now. see code in `bottomless_migrate.rs`
let tmp_dbs_path = base_config.base_path.join("_dbs");
if tmp_dbs_path.try_exists()? {
tracing::info!("removed dangling `_dbs` folder");
tokio::fs::remove_dir_all(&tmp_dbs_path).await?;
}

tracing::info!("bottomless already migrated, skipping...");
Ok(false)
tracing::info!("bottomless already migrated, skipping...");
}
}

Ok(false)
}

fn check_previous_migration_success(&self) -> anyhow::Result<bool> {
Expand Down
24 changes: 24 additions & 0 deletions libsql-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,27 @@ struct Cli {
/// Auth key for the admin API
#[clap(long, env = "LIBSQL_ADMIN_AUTH_KEY", requires = "admin_listen_addr")]
admin_auth_key: Option<String>,

/// Whether to perform a sync of all namespaces with remote on startup
#[clap(
long,
env = "LIBSQL_SYNC_FROM_STORAGE",
requires = "enable_bottomless_replication"
)]
sync_from_storage: bool,
/// Whether to force loading all WAL at startup, with libsql-wal
/// By default, WALs are loaded lazily, as the databases are openned.
/// Whether to force loading all wal at startup
#[clap(long)]
force_load_wals: bool,
/// Sync conccurency
#[clap(
long,
env = "LIBSQL_SYNC_CONCCURENCY",
requires = "sync_from_storage",
default_value = "8"
)]
sync_conccurency: usize,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -681,6 +702,9 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
connector: Some(https),
migrate_bottomless: config.migrate_bottomless,
enable_deadlock_monitor: config.enable_deadlock_monitor,
should_sync_from_storage: config.sync_from_storage,
force_load_wals: config.force_load_wals,
sync_conccurency: config.sync_conccurency,
})
}

Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/namespace/configurator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ pub(crate) async fn run_storage_monitor<M: MakeConnection>(
.await;
}
Err(e) => {
tracing::warn!("failed to open connection for storager monitor: {e}, trying again in {duration:?}");
tracing::warn!("failed to open connection for storage monitor: {e}, trying again in {duration:?}");
}
}

Expand Down
9 changes: 1 addition & 8 deletions libsql-server/src/namespace/configurator/libsql_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use libsql_sys::name::NamespaceResolver;
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use libsql_wal::replication::injector::Injector;
use libsql_wal::transaction::Transaction;
use libsql_wal::wal::LibsqlWalManager;
use tokio::task::JoinSet;
use tonic::transport::Channel;
Expand Down Expand Up @@ -151,13 +150,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
.await
.unwrap();

let mut tx = Transaction::Read(shared.begin_read(u64::MAX));
shared.upgrade(&mut tx).unwrap();
let guard = tx
.into_write()
.unwrap_or_else(|_| panic!())
.into_lock_owned();
let injector = Injector::new(shared, guard, 10).unwrap();
let injector = Injector::new(shared, 10).unwrap();
let injector = LibsqlInjector::new(injector);
let mut replicator = Replicator::new(client, injector);

Expand Down
26 changes: 21 additions & 5 deletions libsql-server/src/rpc/replication/libsql_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use libsql_replication::rpc::replication::{
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use libsql_wal::segment::Frame;
use libsql_wal::shared_wal::SharedWal;
use md5::{Digest as _, Md5};
use tokio_stream::Stream;
use tonic::Status;
Expand Down Expand Up @@ -72,12 +73,17 @@ pin_project_lite::pin_project! {
#[pin]
inner: S,
flavor: WalFlavor,
shared: Arc<SharedWal<StdIO>>,
}
}

impl<S> FrameStreamAdapter<S> {
fn new(inner: S, flavor: WalFlavor) -> Self {
Self { inner, flavor }
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
Self {
inner,
flavor,
shared,
}
}
}

Expand All @@ -93,6 +99,11 @@ where
Some(Ok(f)) => {
match this.flavor {
WalFlavor::Libsql => {
let durable_frame_no = if f.header().is_commit() {
Some(this.shared.durable_frame_no())
} else {
None
};
// safety: frame implemements zerocopy traits, so it can safely be interpreted as a
// byte slize of the same size
let bytes: Box<[u8; size_of::<Frame>()]> =
Expand All @@ -102,6 +113,7 @@ where
Poll::Ready(Some(Ok(RpcFrame {
data,
timestamp: None,
durable_frame_no,
})))
}
WalFlavor::Sqlite => {
Expand All @@ -116,6 +128,7 @@ where
Poll::Ready(Some(Ok(RpcFrame {
data: frame.bytes(),
timestamp: None,
durable_frame_no: None,
})))
}
}
Expand All @@ -131,6 +144,7 @@ impl ReplicationLog for LibsqlReplicationService {
type LogEntriesStream = BoxStream<'static, Result<RpcFrame, Status>>;
type SnapshotStream = BoxStream<'static, Result<RpcFrame, Status>>;

#[tracing::instrument(skip_all, fields(namespace))]
async fn log_entries(
&self,
req: tonic::Request<LogOffset>,
Expand All @@ -140,11 +154,13 @@ impl ReplicationLog for LibsqlReplicationService {
let shared = self.registry.get_async(&namespace.into()).await.unwrap();
let req = req.into_inner();
// TODO: replicator should only accecpt NonZero
let replicator =
libsql_wal::replication::replicator::Replicator::new(shared, req.next_offset.max(1));
let replicator = libsql_wal::replication::replicator::Replicator::new(
shared.clone(),
req.next_offset.max(1),
);

let flavor = req.wal_flavor();
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor);
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared);
Ok(tonic::Response::new(Box::pin(stream)))
}

Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/rpc/replication/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ fn map_frame_stream_output(
Ok((frame, ts)) => Ok(Frame {
data: frame.bytes(),
timestamp: ts.map(|ts| ts.timestamp_millis()),
durable_frame_no: None,
}),
Err(LogReadError::SnapshotRequired) => Err(Status::new(
tonic::Code::FailedPrecondition,
Expand Down Expand Up @@ -431,6 +432,7 @@ mod snapshot_stream {
yield Ok(Frame {
data: libsql_replication::frame::Frame::from(frame).bytes(),
timestamp: None,
durable_frame_no: None,
});
}
Err(e) => {
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/src/bins/shell/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
S: Storage,
{
let namespace = NamespaceName::from_string(namespace.to_owned());
let durable = storage.durable_frame_no(&namespace, None).await;
let durable = storage.durable_frame_no(&namespace, None).await.unwrap();
println!("namespace: {namespace}");
println!("max durable frame: {durable}");
}
Expand Down
Loading

0 comments on commit d9f7a46

Please sign in to comment.