Skip to content

Commit

Permalink
Merge pull request #1732 from tursodatabase/libsql-pitr-alt
Browse files Browse the repository at this point in the history
libsql-wal PITR
  • Loading branch information
MarinPostma authored Sep 13, 2024
2 parents ae3bb0f + 0c22fa8 commit c1cf1f4
Show file tree
Hide file tree
Showing 24 changed files with 404 additions and 387 deletions.
4 changes: 2 additions & 2 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::extract::{FromRef, Path, State};
use axum::middleware::Next;
use axum::routing::delete;
use axum::Json;
use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use hyper::{Body, Request, StatusCode};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
Expand Down Expand Up @@ -427,7 +427,7 @@ async fn handle_create_namespace<C: Connector>(

#[derive(Debug, Deserialize)]
struct ForkNamespaceReq {
timestamp: NaiveDateTime,
timestamp: DateTime<Utc>,
}

async fn handle_fork_namespace<C>(
Expand Down
34 changes: 31 additions & 3 deletions libsql-server/src/namespace/configurator/libsql_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::prelude::Future;
use libsql_sys::name::NamespaceResolver;
use libsql_sys::wal::either::Either;
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use libsql_wal::storage::backend::Backend;
use libsql_wal::wal::LibsqlWalManager;
use tokio::task::JoinSet;

Expand Down Expand Up @@ -263,13 +266,38 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {

fn fork<'a>(
&'a self,
_from_ns: &'a Namespace,
from_ns: &'a Namespace,
_from_config: MetaStoreHandle,
_to_ns: NamespaceName,
_to_config: MetaStoreHandle,
_timestamp: Option<chrono::prelude::NaiveDateTime>,
timestamp: Option<DateTime<Utc>>,
_store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
unimplemented!()
Box::pin(async move {
match self.registry.storage() {
Either::A(s) => {
match timestamp {
Some(ts) => {
let ns: libsql_sys::name::NamespaceName = from_ns.name().clone().into();
let _key = s
.backend()
.find_segment(
&s.backend().default_config(),
&ns,
libsql_wal::storage::backend::FindSegmentReq::Timestamp(ts),
)
.await
.unwrap();
todo!()
}
// find the most recent frame_no
None => todo!("fork from most recent"),
};
}
Either::B(_) => {
todo!("cannot fork without storage");
}
}
})
}
}
3 changes: 2 additions & 1 deletion libsql-server/src/namespace/configurator/libsql_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use hyper::Uri;
use libsql_replication::injector::LibsqlInjector;
use libsql_replication::replicator::Replicator;
Expand Down Expand Up @@ -265,7 +266,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
_from_config: MetaStoreHandle,
_to_ns: NamespaceName,
_to_config: MetaStoreHandle,
_timestamp: Option<chrono::prelude::NaiveDateTime>,
_timestamp: Option<DateTime<Utc>>,
_store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
Box::pin(std::future::ready(Err(crate::Error::Fork(
Expand Down
24 changes: 8 additions & 16 deletions libsql-server/src/namespace/configurator/libsql_schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::path::Path;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::prelude::Future;
use libsql_sys::name::NamespaceResolver;
use libsql_wal::io::StdIO;
Expand Down Expand Up @@ -159,22 +160,13 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {

fn fork<'a>(
&'a self,
from_ns: &'a Namespace,
from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<chrono::prelude::NaiveDateTime>,
store: NamespaceStore,
_from_ns: &'a Namespace,
_from_config: MetaStoreHandle,
_to_ns: NamespaceName,
_to_config: MetaStoreHandle,
_timestamp: Option<DateTime<Utc>>,
_store: NamespaceStore,
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
Box::pin(super::fork::fork(
from_ns,
from_config,
to_ns,
to_config,
timestamp,
store,
&self.primary_config,
self.base.base_path.clone(),
))
todo!()
}
}
4 changes: 2 additions & 2 deletions libsql-server/src/namespace/configurator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use futures::Future;
use libsql_sys::EncryptionConfig;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -139,7 +139,7 @@ pub trait ConfigureNamespace {
from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<NaiveDateTime>,
timestamp: Option<DateTime<Utc>>,
store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>>;
}
5 changes: 3 additions & 2 deletions libsql-server/src/namespace/configurator/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::prelude::Future;
use libsql_sys::EncryptionConfig;
use tokio::task::JoinSet;
Expand Down Expand Up @@ -184,15 +185,15 @@ impl ConfigureNamespace for PrimaryConfigurator {
from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<chrono::prelude::NaiveDateTime>,
timestamp: Option<DateTime<Utc>>,
store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
Box::pin(super::fork::fork(
from_ns,
from_config,
to_ns,
to_config,
timestamp,
timestamp.map(|d| d.naive_utc()),
store,
&self.primary_config,
self.base.base_path.clone(),
Expand Down
3 changes: 2 additions & 1 deletion libsql-server/src/namespace/configurator/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use chrono::{DateTime, Utc};
use futures::Future;
use hyper::Uri;
use libsql_replication::rpc::replication::log_offset::WalFlavor;
Expand Down Expand Up @@ -255,7 +256,7 @@ impl ConfigureNamespace for ReplicaConfigurator {
_from_config: MetaStoreHandle,
_to_ns: NamespaceName,
_to_config: MetaStoreHandle,
_timestamp: Option<chrono::prelude::NaiveDateTime>,
_timestamp: Option<DateTime<Utc>>,
_store: NamespaceStore,
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
Box::pin(std::future::ready(Err(crate::Error::Fork(
Expand Down
5 changes: 3 additions & 2 deletions libsql-server/src/namespace/configurator/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::{atomic::AtomicBool, Arc};

use chrono::{DateTime, Utc};
use futures::prelude::Future;
use tokio::task::JoinSet;

Expand Down Expand Up @@ -120,15 +121,15 @@ impl ConfigureNamespace for SchemaConfigurator {
from_config: MetaStoreHandle,
to_ns: NamespaceName,
to_config: MetaStoreHandle,
timestamp: Option<chrono::prelude::NaiveDateTime>,
timestamp: Option<DateTime<Utc>>,
store: NamespaceStore,
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
Box::pin(super::fork::fork(
from_ns,
from_config,
to_ns,
to_config,
timestamp,
timestamp.map(|ts| ts.naive_utc()),
store,
&self.primary_config,
self.base.base_path.clone(),
Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/namespace/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use async_lock::RwLock;
use chrono::NaiveDateTime;
use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use moka::future::Cache;
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -219,7 +219,7 @@ impl NamespaceStore {
from: NamespaceName,
to: NamespaceName,
to_config: DatabaseConfig,
timestamp: Option<NaiveDateTime>,
timestamp: Option<DateTime<Utc>>,
) -> crate::Result<()> {
if self.inner.has_shutdown.load(Ordering::Relaxed) {
return Err(Error::NamespaceStoreShutdown);
Expand Down
13 changes: 10 additions & 3 deletions libsql-server/src/wal_toolkit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use anyhow::Context as _;
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider};
use chrono::DateTime;
use libsql_wal::io::StdIO;
use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy;
Expand Down Expand Up @@ -80,11 +81,15 @@ impl WalToolkit {
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
println!(
"- oldest segment: {}-{} ({})",
first.key.start_frame_no, first.key.end_frame_no, first.created_at
first.key.start_frame_no,
first.key.end_frame_no,
DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap()
);
println!(
"- most recent segment: {}-{} ({})",
last.key.start_frame_no, last.key.end_frame_no, last.created_at
last.key.start_frame_no,
last.key.end_frame_no,
DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap()
);
}

Expand All @@ -93,7 +98,9 @@ impl WalToolkit {
compactor.list_all(&namespace, |info| {
println!(
"- {}-{} ({})",
info.key.start_frame_no, info.key.end_frame_no, info.created_at
info.key.start_frame_no,
info.key.end_frame_no,
DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap()
);
})?;
}
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ where
)?;
// sealing must the last fallible operation, because we don't want to end up in a situation
// where the current log is sealed and it wasn't swapped.
if let Some(sealed) = current.seal()? {
if let Some(sealed) = current.seal(self.io.now())? {
new.tail().push(sealed.clone());
maybe_store_segment(
self.storage.as_ref(),
Expand Down
1 change: 1 addition & 0 deletions libsql-wal/src/segment/compacted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct CompactedSegmentDataHeader {
pub(crate) size_after: lu32,
/// for now, always 4096
pub(crate) page_size: lu16,
pub(crate) timestamp: lu64,
}
impl CompactedSegmentDataHeader {
fn check(&self) -> Result<()> {
Expand Down
5 changes: 4 additions & 1 deletion libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::{
Arc,
};

use chrono::{DateTime, Utc};
use crossbeam_skiplist::SkipMap;
use fst::MapBuilder;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -74,6 +75,7 @@ impl<F> CurrentSegment<F> {
page_size: LIBSQL_PAGE_SIZE.into(),
log_id: log_id.as_u128().into(),
frame_count: 0.into(),
sealed_at_timestamp: 0.into(),
};

header.recompute_checksum();
Expand Down Expand Up @@ -408,7 +410,7 @@ impl<F> CurrentSegment<F> {

/// It is expected that sealing is performed under a write lock
#[tracing::instrument(skip_all)]
pub fn seal(&self) -> Result<Option<SealedSegment<F>>>
pub fn seal(&self, now: DateTime<Utc>) -> Result<Option<SealedSegment<F>>>
where
F: FileExt,
{
Expand Down Expand Up @@ -442,6 +444,7 @@ impl<F> CurrentSegment<F> {
header.index_size = index_size.into();
let flags = header.flags();
header.set_flags(flags | SegmentFlags::SEALED);
header.sealed_at_timestamp = (now.timestamp_millis() as u64).into();
header.recompute_checksum();
self.file.write_all_at(header.as_bytes(), 0)?;

Expand Down
27 changes: 18 additions & 9 deletions libsql-wal/src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use std::mem::size_of;
use std::num::NonZeroU64;
use std::sync::Arc;

use chrono::DateTime;
use chrono::Utc;
use zerocopy::byteorder::little_endian::{U128, U16, U32, U64};
use zerocopy::AsBytes;

Expand Down Expand Up @@ -66,6 +68,8 @@ pub struct SegmentHeader {
/// we could do it without changing the header
pub page_size: U16,
pub log_id: U128,
/// ms, from unix epoch
pub sealed_at_timestamp: U64,

/// checksum of the header fields, excluding the checksum itself. This field must be the last
pub header_cheksum: U32,
Expand Down Expand Up @@ -169,6 +173,7 @@ pub trait Segment: Send + Sync + 'static {
async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static;
fn timestamp(&self) -> DateTime<Utc>;

fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()>;
}
Expand All @@ -194,15 +199,12 @@ impl<T: Segment> Segment for Arc<T> {
self.as_ref().index()
}

fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
self.as_ref().read_page(page_no, max_frame_no, buf)
fn is_storable(&self) -> bool {
self.as_ref().is_storable()
}

async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static,
{
self.as_ref().read_frame_offset_async(offset, buf).await
fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result<bool> {
self.as_ref().read_page(page_no, max_frame_no, buf)
}

fn is_checkpointable(&self) -> bool {
Expand All @@ -213,12 +215,19 @@ impl<T: Segment> Segment for Arc<T> {
self.as_ref().size_after()
}

async fn read_frame_offset_async<B>(&self, offset: u32, buf: B) -> (B, Result<()>)
where
B: IoBufMut + Send + 'static,
{
self.as_ref().read_frame_offset_async(offset, buf).await
}

fn destroy<IO: Io>(&self, io: &IO) -> impl Future<Output = ()> {
self.as_ref().destroy(io)
}

fn is_storable(&self) -> bool {
self.as_ref().is_storable()
fn timestamp(&self) -> DateTime<Utc> {
self.as_ref().timestamp()
}
}

Expand Down
Loading

0 comments on commit c1cf1f4

Please sign in to comment.