diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 0a51d440b3..1bb50229d8 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -4,7 +4,7 @@ use axum::extract::{FromRef, Path, State}; use axum::middleware::Next; use axum::routing::delete; use axum::Json; -use chrono::NaiveDateTime; +use chrono::{DateTime, Utc}; use futures::{SinkExt, StreamExt, TryStreamExt}; use hyper::{Body, Request, StatusCode}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; @@ -427,7 +427,7 @@ async fn handle_create_namespace( #[derive(Debug, Deserialize)] struct ForkNamespaceReq { - timestamp: NaiveDateTime, + timestamp: DateTime, } async fn handle_fork_namespace( diff --git a/libsql-server/src/namespace/configurator/libsql_primary.rs b/libsql-server/src/namespace/configurator/libsql_primary.rs index e58c058115..3d5a054698 100644 --- a/libsql-server/src/namespace/configurator/libsql_primary.rs +++ b/libsql-server/src/namespace/configurator/libsql_primary.rs @@ -3,10 +3,13 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use futures::prelude::Future; use libsql_sys::name::NamespaceResolver; +use libsql_sys::wal::either::Either; use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; +use libsql_wal::storage::backend::Backend; use libsql_wal::wal::LibsqlWalManager; use tokio::task::JoinSet; @@ -263,13 +266,38 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator { fn fork<'a>( &'a self, - _from_ns: &'a Namespace, + from_ns: &'a Namespace, _from_config: MetaStoreHandle, _to_ns: NamespaceName, _to_config: MetaStoreHandle, - _timestamp: Option, + timestamp: Option>, _store: NamespaceStore, ) -> Pin> + Send + 'a>> { - unimplemented!() + Box::pin(async move { + match self.registry.storage() { + Either::A(s) => { + match timestamp { + Some(ts) => { + let ns: libsql_sys::name::NamespaceName = from_ns.name().clone().into(); + let _key = s + .backend() + .find_segment( + &s.backend().default_config(), + &ns, + libsql_wal::storage::backend::FindSegmentReq::Timestamp(ts), + ) + .await + .unwrap(); + todo!() + } + // find the most recent frame_no + None => todo!("fork from most recent"), + }; + } + Either::B(_) => { + todo!("cannot fork without storage"); + } + } + }) } } diff --git a/libsql-server/src/namespace/configurator/libsql_replica.rs b/libsql-server/src/namespace/configurator/libsql_replica.rs index d460924dff..8456482b4b 100644 --- a/libsql-server/src/namespace/configurator/libsql_replica.rs +++ b/libsql-server/src/namespace/configurator/libsql_replica.rs @@ -2,6 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use chrono::{DateTime, Utc}; use hyper::Uri; use libsql_replication::injector::LibsqlInjector; use libsql_replication::replicator::Replicator; @@ -265,7 +266,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator { _from_config: MetaStoreHandle, _to_ns: NamespaceName, _to_config: MetaStoreHandle, - _timestamp: Option, + _timestamp: Option>, _store: NamespaceStore, ) -> Pin> + Send + 'a>> { Box::pin(std::future::ready(Err(crate::Error::Fork( diff --git a/libsql-server/src/namespace/configurator/libsql_schema.rs b/libsql-server/src/namespace/configurator/libsql_schema.rs index 190f4eb80f..8ee246fe84 100644 --- a/libsql-server/src/namespace/configurator/libsql_schema.rs +++ b/libsql-server/src/namespace/configurator/libsql_schema.rs @@ -1,6 +1,7 @@ use std::path::Path; use std::sync::Arc; +use chrono::{DateTime, Utc}; use futures::prelude::Future; use libsql_sys::name::NamespaceResolver; use libsql_wal::io::StdIO; @@ -159,22 +160,13 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator { fn fork<'a>( &'a self, - from_ns: &'a Namespace, - from_config: MetaStoreHandle, - to_ns: NamespaceName, - to_config: MetaStoreHandle, - timestamp: Option, - store: NamespaceStore, + _from_ns: &'a Namespace, + _from_config: MetaStoreHandle, + _to_ns: NamespaceName, + _to_config: MetaStoreHandle, + _timestamp: Option>, + _store: NamespaceStore, ) -> std::pin::Pin> + Send + 'a>> { - Box::pin(super::fork::fork( - from_ns, - from_config, - to_ns, - to_config, - timestamp, - store, - &self.primary_config, - self.base.base_path.clone(), - )) + todo!() } } diff --git a/libsql-server/src/namespace/configurator/mod.rs b/libsql-server/src/namespace/configurator/mod.rs index bfcad5a02e..a29d2315ca 100644 --- a/libsql-server/src/namespace/configurator/mod.rs +++ b/libsql-server/src/namespace/configurator/mod.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use chrono::NaiveDateTime; +use chrono::{DateTime, Utc}; use futures::Future; use libsql_sys::EncryptionConfig; use tokio::sync::Semaphore; @@ -139,7 +139,7 @@ pub trait ConfigureNamespace { from_config: MetaStoreHandle, to_ns: NamespaceName, to_config: MetaStoreHandle, - timestamp: Option, + timestamp: Option>, store: NamespaceStore, ) -> Pin> + Send + 'a>>; } diff --git a/libsql-server/src/namespace/configurator/primary.rs b/libsql-server/src/namespace/configurator/primary.rs index f68405fad6..487602417c 100644 --- a/libsql-server/src/namespace/configurator/primary.rs +++ b/libsql-server/src/namespace/configurator/primary.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use futures::prelude::Future; use libsql_sys::EncryptionConfig; use tokio::task::JoinSet; @@ -184,7 +185,7 @@ impl ConfigureNamespace for PrimaryConfigurator { from_config: MetaStoreHandle, to_ns: NamespaceName, to_config: MetaStoreHandle, - timestamp: Option, + timestamp: Option>, store: NamespaceStore, ) -> Pin> + Send + 'a>> { Box::pin(super::fork::fork( @@ -192,7 +193,7 @@ impl ConfigureNamespace for PrimaryConfigurator { from_config, to_ns, to_config, - timestamp, + timestamp.map(|d| d.naive_utc()), store, &self.primary_config, self.base.base_path.clone(), diff --git a/libsql-server/src/namespace/configurator/replica.rs b/libsql-server/src/namespace/configurator/replica.rs index 2215a55c34..92ad8c5255 100644 --- a/libsql-server/src/namespace/configurator/replica.rs +++ b/libsql-server/src/namespace/configurator/replica.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use chrono::{DateTime, Utc}; use futures::Future; use hyper::Uri; use libsql_replication::rpc::replication::log_offset::WalFlavor; @@ -255,7 +256,7 @@ impl ConfigureNamespace for ReplicaConfigurator { _from_config: MetaStoreHandle, _to_ns: NamespaceName, _to_config: MetaStoreHandle, - _timestamp: Option, + _timestamp: Option>, _store: NamespaceStore, ) -> Pin> + Send + 'a>> { Box::pin(std::future::ready(Err(crate::Error::Fork( diff --git a/libsql-server/src/namespace/configurator/schema.rs b/libsql-server/src/namespace/configurator/schema.rs index 275fd71e93..3eb27082b6 100644 --- a/libsql-server/src/namespace/configurator/schema.rs +++ b/libsql-server/src/namespace/configurator/schema.rs @@ -1,5 +1,6 @@ use std::sync::{atomic::AtomicBool, Arc}; +use chrono::{DateTime, Utc}; use futures::prelude::Future; use tokio::task::JoinSet; @@ -120,7 +121,7 @@ impl ConfigureNamespace for SchemaConfigurator { from_config: MetaStoreHandle, to_ns: NamespaceName, to_config: MetaStoreHandle, - timestamp: Option, + timestamp: Option>, store: NamespaceStore, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(super::fork::fork( @@ -128,7 +129,7 @@ impl ConfigureNamespace for SchemaConfigurator { from_config, to_ns, to_config, - timestamp, + timestamp.map(|ts| ts.naive_utc()), store, &self.primary_config, self.base.base_path.clone(), diff --git a/libsql-server/src/namespace/store.rs b/libsql-server/src/namespace/store.rs index f9f614fc77..c3cbd5f010 100644 --- a/libsql-server/src/namespace/store.rs +++ b/libsql-server/src/namespace/store.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_lock::RwLock; -use chrono::NaiveDateTime; +use chrono::{DateTime, Utc}; use futures::TryFutureExt; use moka::future::Cache; use once_cell::sync::OnceCell; @@ -219,7 +219,7 @@ impl NamespaceStore { from: NamespaceName, to: NamespaceName, to_config: DatabaseConfig, - timestamp: Option, + timestamp: Option>, ) -> crate::Result<()> { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); diff --git a/libsql-server/src/wal_toolkit.rs b/libsql-server/src/wal_toolkit.rs index 6b9dbc141d..6f8e9d9e6d 100644 --- a/libsql-server/src/wal_toolkit.rs +++ b/libsql-server/src/wal_toolkit.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::Context as _; use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; +use chrono::DateTime; use libsql_wal::io::StdIO; use libsql_wal::storage::backend::s3::S3Backend; use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy; @@ -80,11 +81,15 @@ impl WalToolkit { if let Some((first, last)) = compactor.get_segment_range(&namespace)? { println!( "- oldest segment: {}-{} ({})", - first.key.start_frame_no, first.key.end_frame_no, first.created_at + first.key.start_frame_no, + first.key.end_frame_no, + DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap() ); println!( "- most recent segment: {}-{} ({})", - last.key.start_frame_no, last.key.end_frame_no, last.created_at + last.key.start_frame_no, + last.key.end_frame_no, + DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap() ); } @@ -93,7 +98,9 @@ impl WalToolkit { compactor.list_all(&namespace, |info| { println!( "- {}-{} ({})", - info.key.start_frame_no, info.key.end_frame_no, info.created_at + info.key.start_frame_no, + info.key.end_frame_no, + DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap() ); })?; } diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 7456d714e4..793e46c67e 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -542,7 +542,7 @@ where )?; // sealing must the last fallible operation, because we don't want to end up in a situation // where the current log is sealed and it wasn't swapped. - if let Some(sealed) = current.seal()? { + if let Some(sealed) = current.seal(self.io.now())? { new.tail().push(sealed.clone()); maybe_store_segment( self.storage.as_ref(), diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index 4194b34820..4259114425 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -23,6 +23,7 @@ pub struct CompactedSegmentDataHeader { pub(crate) size_after: lu32, /// for now, always 4096 pub(crate) page_size: lu16, + pub(crate) timestamp: lu64, } impl CompactedSegmentDataHeader { fn check(&self) -> Result<()> { diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 9b3b4ce5ac..b02662040a 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -9,6 +9,7 @@ use std::sync::{ Arc, }; +use chrono::{DateTime, Utc}; use crossbeam_skiplist::SkipMap; use fst::MapBuilder; use parking_lot::{Mutex, RwLock}; @@ -74,6 +75,7 @@ impl CurrentSegment { page_size: LIBSQL_PAGE_SIZE.into(), log_id: log_id.as_u128().into(), frame_count: 0.into(), + sealed_at_timestamp: 0.into(), }; header.recompute_checksum(); @@ -408,7 +410,7 @@ impl CurrentSegment { /// It is expected that sealing is performed under a write lock #[tracing::instrument(skip_all)] - pub fn seal(&self) -> Result>> + pub fn seal(&self, now: DateTime) -> Result>> where F: FileExt, { @@ -442,6 +444,7 @@ impl CurrentSegment { header.index_size = index_size.into(); let flags = header.flags(); header.set_flags(flags | SegmentFlags::SEALED); + header.sealed_at_timestamp = (now.timestamp_millis() as u64).into(); header.recompute_checksum(); self.file.write_all_at(header.as_bytes(), 0)?; diff --git a/libsql-wal/src/segment/mod.rs b/libsql-wal/src/segment/mod.rs index a3f6d56441..8ec9acef0a 100644 --- a/libsql-wal/src/segment/mod.rs +++ b/libsql-wal/src/segment/mod.rs @@ -15,6 +15,8 @@ use std::mem::size_of; use std::num::NonZeroU64; use std::sync::Arc; +use chrono::DateTime; +use chrono::Utc; use zerocopy::byteorder::little_endian::{U128, U16, U32, U64}; use zerocopy::AsBytes; @@ -66,6 +68,8 @@ pub struct SegmentHeader { /// we could do it without changing the header pub page_size: U16, pub log_id: U128, + /// ms, from unix epoch + pub sealed_at_timestamp: U64, /// checksum of the header fields, excluding the checksum itself. This field must be the last pub header_cheksum: U32, @@ -169,6 +173,7 @@ pub trait Segment: Send + Sync + 'static { async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) where B: IoBufMut + Send + 'static; + fn timestamp(&self) -> DateTime; fn destroy(&self, io: &IO) -> impl Future; } @@ -194,15 +199,12 @@ impl Segment for Arc { self.as_ref().index() } - fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result { - self.as_ref().read_page(page_no, max_frame_no, buf) + fn is_storable(&self) -> bool { + self.as_ref().is_storable() } - async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) - where - B: IoBufMut + Send + 'static, - { - self.as_ref().read_frame_offset_async(offset, buf).await + fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> io::Result { + self.as_ref().read_page(page_no, max_frame_no, buf) } fn is_checkpointable(&self) -> bool { @@ -213,12 +215,19 @@ impl Segment for Arc { self.as_ref().size_after() } + async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) + where + B: IoBufMut + Send + 'static, + { + self.as_ref().read_frame_offset_async(offset, buf).await + } + fn destroy(&self, io: &IO) -> impl Future { self.as_ref().destroy(io) } - fn is_storable(&self) -> bool { - self.as_ref().is_storable() + fn timestamp(&self) -> DateTime { + self.as_ref().timestamp() } } diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index f5f3b244a8..d3c087aa4b 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -9,6 +9,7 @@ use std::sync::{ Arc, }; +use chrono::prelude::{DateTime, Utc}; use fst::{Map, MapBuilder, Streamer}; use zerocopy::{AsBytes, FromZeroes}; @@ -94,6 +95,7 @@ where version: LIBSQL_WAL_VERSION.into(), magic: LIBSQL_MAGIC.into(), page_size: self.header().page_size, + timestamp: self.header.sealed_at_timestamp, }; hasher.update(header.as_bytes()); @@ -155,6 +157,17 @@ where &self.index } + fn is_storable(&self) -> bool { + // we don't store unordered segments, since they only happen in two cases: + // - in a replica: no need for storage + // - in a primary, on recovery from storage: we don't want to override remote + // segment. + !self + .header() + .flags() + .contains(SegmentFlags::FRAME_UNORDERED) + } + fn read_page(&self, page_no: u32, max_frame_no: u64, buf: &mut [u8]) -> std::io::Result { if self.header().start_frame_no.get() > max_frame_no { return Ok(false); @@ -170,16 +183,6 @@ where Ok(false) } - async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) - where - B: IoBufMut + Send + 'static, - { - assert_eq!(buf.bytes_total(), size_of::()); - let frame_offset = frame_offset(offset); - let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await; - (buf, ret.map_err(Into::into)) - } - fn is_checkpointable(&self) -> bool { let read_locks = self.read_locks.load(Ordering::Relaxed); tracing::debug!(read_locks); @@ -190,6 +193,16 @@ where self.header().size_after() } + async fn read_frame_offset_async(&self, offset: u32, buf: B) -> (B, Result<()>) + where + B: IoBufMut + Send + 'static, + { + assert_eq!(buf.bytes_total(), size_of::()); + let frame_offset = frame_offset(offset); + let (buf, ret) = self.file.read_exact_at_async(buf, frame_offset as _).await; + (buf, ret.map_err(Into::into)) + } + fn destroy(&self, io: &IO) -> impl std::future::Future { async move { if let Err(e) = io.remove_file_async(&self.path).await { @@ -198,15 +211,14 @@ where } } - fn is_storable(&self) -> bool { - // we don't store unordered segments, since they only happen in two cases: - // - in a replica: no need for storage - // - in a primary, on recovery from storage: we don't want to override remote - // segment. - !self - .header() - .flags() - .contains(SegmentFlags::FRAME_UNORDERED) + fn timestamp(&self) -> DateTime { + assert_ne!( + self.header().sealed_at_timestamp.get(), + 0, + "segment was not sealed properly" + ); + DateTime::from_timestamp_millis(self.header().sealed_at_timestamp.get() as _) + .expect("this should be a guaranteed roundtrip with DateTime::timestamp_millis") } } diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index f96fe3f741..e9c0951702 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -79,7 +79,7 @@ impl SharedWal { } // The current segment will not be used anymore. It's empty, but we still seal it so that // the next startup doesn't find an unsealed segment. - self.current.load().seal()?; + self.current.load().seal(self.io.now())?; tracing::info!("namespace shutdown"); Ok(()) } diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index da4f3a75ab..d02703376c 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -234,7 +234,11 @@ where let config = config_override.unwrap_or_else(|| self.backend.default_config()); let key = self .backend - .find_segment(&config, namespace, frame_no) + .find_segment( + &config, + namespace, + super::backend::FindSegmentReq::Frame(frame_no), + ) .await?; Ok(key) } @@ -326,6 +330,10 @@ impl AsyncStorage { (this, storage_loop) } + pub fn backend(&self) -> &B { + &self.backend + } + /// send shutdown signal to bottomless. /// return a function that can be called to force shutdown, if necessary pub fn send_shutdown(self) -> impl FnOnce() { diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index e054dbb8cd..fb9c1ba8ba 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -22,7 +22,7 @@ pub struct SegmentMeta { pub segment_id: Uuid, pub start_frame_no: u64, pub end_frame_no: u64, - pub created_at: DateTime, + pub segment_timestamp: DateTime, } pub struct RestoreRequest {} @@ -31,6 +31,13 @@ pub struct DbMeta { pub max_frame_no: u64, } +pub enum FindSegmentReq { + /// returns a segment containing this frame + Frame(u64), + /// Returns the segment with closest timestamp less than or equal to the requested timestamp + Timestamp(DateTime), +} + pub trait Backend: Send + Sync + 'static { /// Config type associated with the Storage type Config: Clone + Send + Sync + 'static; @@ -48,7 +55,7 @@ pub trait Backend: Send + Sync + 'static { &self, config: &Self::Config, namespace: &NamespaceName, - frame_no: u64, + req: FindSegmentReq, ) -> impl Future> + Send; fn fetch_segment_index( @@ -161,11 +168,9 @@ impl Backend for Arc { &self, config: &Self::Config, namespace: &NamespaceName, - frame_no: u64, + req: FindSegmentReq, ) -> Result { - self.as_ref() - .find_segment(config, namespace, frame_no) - .await + self.as_ref().find_segment(config, namespace, req).await } async fn fetch_segment_index( diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 0c4b7635fe..738595cc00 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -1,6 +1,6 @@ //! S3 implementation of storage backend -use std::fmt; +use std::fmt::{self, Formatter}; use std::mem::size_of; use std::path::Path; use std::pin::Pin; @@ -10,11 +10,12 @@ use std::task::Poll; use aws_config::SdkConfig; use aws_sdk_s3::operation::create_bucket::CreateBucketError; +use aws_sdk_s3::operation::get_object::GetObjectOutput; use aws_sdk_s3::primitives::{ByteStream, SdkBody}; -use aws_sdk_s3::types::CreateBucketConfiguration; +use aws_sdk_s3::types::{CreateBucketConfiguration, Object}; use aws_sdk_s3::Client; -use aws_smithy_types_convert::date_time::DateTimeExt; use bytes::{Bytes, BytesMut}; +use chrono::{DateTime, Utc}; use http_body::{Frame as HttpFrame, SizeHint}; use libsql_sys::name::NamespaceName; use roaring::RoaringBitmap; @@ -24,7 +25,7 @@ use tokio_util::sync::ReusableBoxFuture; use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; -use super::{Backend, SegmentMeta}; +use super::{Backend, FindSegmentReq, SegmentMeta}; use crate::io::buf::ZeroCopyBuf; use crate::io::compat::copy_to_file; use crate::io::{FileExt, Io, StdIO}; @@ -128,7 +129,7 @@ impl S3Backend { ) -> Result { let key = s3_segment_data_key(folder_key, segment_key); let stream = self.s3_get(config, key).await?; - Ok(stream.into_async_read()) + Ok(stream.body.into_async_read()) } async fn fetch_segment_data_inner( @@ -150,24 +151,23 @@ impl S3Backend { Ok(header) } - async fn s3_get(&self, config: &S3Config, key: String) -> Result { + async fn s3_get(&self, config: &S3Config, key: impl ToString) -> Result { Ok(self .client .get_object() .bucket(&config.bucket) - .key(key) + .key(key.to_string()) .send() .await - .map_err(|e| Error::unhandled(e, "error sending s3 GET request"))? - .body) + .map_err(|e| Error::unhandled(e, "error sending s3 GET request"))?) } - async fn s3_put(&self, config: &S3Config, key: String, body: ByteStream) -> Result<()> { + async fn s3_put(&self, config: &S3Config, key: impl ToString, body: ByteStream) -> Result<()> { self.client .put_object() .bucket(&config.bucket) .body(body) - .key(key) + .key(key.to_string()) .send() .await .map_err(|e| Error::unhandled(e, "error sending s3 PUT request"))?; @@ -181,7 +181,11 @@ impl S3Backend { segment_key: &SegmentKey, ) -> Result>> { let s3_index_key = s3_segment_index_key(folder_key, segment_key); - let mut stream = self.s3_get(config, s3_index_key).await?.into_async_read(); + let mut stream = self + .s3_get(config, s3_index_key) + .await? + .body + .into_async_read(); let mut header: SegmentIndexHeader = SegmentIndexHeader::new_zeroed(); stream.read_exact(header.as_bytes_mut()).await?; if header.magic.get() != LIBSQL_MAGIC && header.version.get() != 1 { @@ -199,7 +203,7 @@ impl S3Backend { } /// Find the most recent, and biggest segment that may contain `frame_no` - async fn find_segment_inner( + async fn find_segment_by_frame_no( &self, config: &S3Config, folder_key: &FolderKey<'_>, @@ -212,8 +216,8 @@ impl S3Backend { .client .list_objects_v2() .bucket(&config.bucket) - .prefix(lookup_key_prefix) - .start_after(lookup_key) + .prefix(lookup_key_prefix.to_string()) + .start_after(lookup_key.to_string()) .send() .await .map_err(|e| Error::unhandled(e, "failed to list bucket"))?; @@ -229,6 +233,101 @@ impl S3Backend { Ok(key) } + /// We are kinda bruteforcing out way into finding a segment that fits the bill, this can very + /// probably be optimized + #[tracing::instrument(skip(self, config, folder_key))] + async fn find_segment_by_timestamp( + &self, + config: &S3Config, + folder_key: &FolderKey<'_>, + timestamp: DateTime, + ) -> Result> { + let object_to_key = |o: &Object| { + let key_path = o.key().unwrap(); + SegmentKey::validate_from_path(key_path.as_ref(), &folder_key.namespace) + }; + + let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key); + + let mut continuation_token = None; + loop { + let objects = self + .client + .list_objects_v2() + .set_continuation_token(continuation_token.take()) + .bucket(&config.bucket) + .prefix(lookup_key_prefix.to_string()) + .send() + .await + .map_err(|e| Error::unhandled(e, "failed to list bucket"))?; + + // there is noting to restore + if objects.contents().is_empty() { + return Ok(None); + } + + let ts = timestamp.timestamp_millis() as u64; + let search_result = + objects + .contents() + .binary_search_by_key(&std::cmp::Reverse(ts), |o| { + let key = object_to_key(o).unwrap(); + std::cmp::Reverse(key.timestamp) + }); + + match search_result { + Ok(i) => { + let key = object_to_key(&objects.contents()[i]).unwrap(); + tracing::trace!("found perfect match for `{timestamp}`: {key}"); + return Ok(Some(key)); + } + Err(i) if i == 0 => { + // this is caught by the first iteration of the loop, anything that's more + // recent than the most recent should be interpret as most recent + let key = object_to_key(&objects.contents()[i]).unwrap(); + tracing::trace!("best match for `{timestamp}` is most recent segment: {key}"); + return Ok(Some(key)); + } + Err(i) if i == objects.contents().len() => { + // there are two scenarios. Either there are more pages with the request, and + // we fetch older entries, or there aren't. If there are older segment, search + // in those, otherwise, just take the oldest segment and return that + if objects.continuation_token().is_some() { + // nothing to do; fetch next page + } else { + let key = object_to_key(&objects.contents().last().unwrap()).unwrap(); + return Ok(Some(key)); + } + } + // This is the index where timestamp would be inserted, we look left and right of that + // key and pick the closest one. + Err(i) => { + // i - 1 is well defined since we already catch the case where i == 0 above + let left_key = object_to_key(&objects.contents()[i - 1]).unwrap(); + let right_key = object_to_key(&objects.contents()[i]).unwrap(); + let time_to_left = left_key.timestamp().signed_duration_since(timestamp).abs(); + let time_to_right = + right_key.timestamp().signed_duration_since(timestamp).abs(); + + if time_to_left < time_to_right { + return Ok(Some(left_key)); + } else { + return Ok(Some(right_key)); + } + } + } + + match objects.continuation_token { + Some(token) => { + continuation_token = Some(token); + } + None => { + unreachable!("the absence of continuation token should be dealt with earlier"); + } + } + } + } + // This method could probably be optimized a lot by using indexes and only downloading useful // segments async fn restore_latest( @@ -242,7 +341,7 @@ impl S3Backend { namespace, }; let Some(latest_key) = self - .find_segment_inner(config, &folder_key, u64::MAX) + .find_segment_by_frame_no(config, &folder_key, u64::MAX) .await? else { tracing::info!("nothing to restore for {namespace}"); @@ -279,7 +378,7 @@ impl S3Backend { let next_frame_no = header.start_frame_no.get() - 1; let Some(key) = self - .find_segment_inner(config, &folder_key, next_frame_no) + .find_segment_by_frame_no(config, &folder_key, next_frame_no) .await? else { todo!("there should be a segment!"); @@ -325,7 +424,7 @@ impl S3Backend { .client .list_objects_v2() .bucket(&config.bucket) - .prefix(lookup_key_prefix.clone()) + .prefix(lookup_key_prefix.to_string()) .set_continuation_token(continuation_token.take()) .send() .await @@ -339,7 +438,6 @@ impl S3Backend { let infos = SegmentInfo { key, size: entry.size().unwrap_or(0) as usize, - created_at: entry.last_modified().unwrap().to_chrono_utc().unwrap(), }; yield infos; @@ -377,20 +475,63 @@ impl fmt::Display for FolderKey<'_> { } } -fn s3_segment_data_key(folder_key: &FolderKey, segment_key: &SegmentKey) -> String { - format!("{folder_key}/segments/{segment_key}") +pub struct SegmentDataKey<'a>(&'a FolderKey<'a>, &'a SegmentKey); + +impl fmt::Display for SegmentDataKey<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}/segments/{}", self.0, self.1) + } } -fn s3_segment_index_key(folder_key: &FolderKey, segment_key: &SegmentKey) -> String { - format!("{folder_key}/indexes/{segment_key}") +fn s3_segment_data_key<'a>( + folder_key: &'a FolderKey, + segment_key: &'a SegmentKey, +) -> SegmentDataKey<'a> { + SegmentDataKey(folder_key, segment_key) } -fn s3_segment_index_lookup_key_prefix(folder_key: &FolderKey) -> String { - format!("{folder_key}/indexes/") +pub struct SegmentIndexKey<'a>(&'a FolderKey<'a>, &'a SegmentKey); + +impl fmt::Display for SegmentIndexKey<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}/indexes/{}", self.0, self.1) + } +} + +fn s3_segment_index_key<'a>( + folder_key: &'a FolderKey, + segment_key: &'a SegmentKey, +) -> SegmentIndexKey<'a> { + SegmentIndexKey(folder_key, segment_key) +} + +pub struct SegmentIndexLookupKeyPrefix<'a>(&'a FolderKey<'a>); + +impl fmt::Display for SegmentIndexLookupKeyPrefix<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}/indexes/", self.0) + } } -fn s3_segment_index_lookup_key(folder_key: &FolderKey, frame_no: u64) -> String { - format!("{folder_key}/indexes/{:020}", u64::MAX - frame_no) +fn s3_segment_index_lookup_key_prefix<'a>( + folder_key: &'a FolderKey, +) -> SegmentIndexLookupKeyPrefix<'a> { + SegmentIndexLookupKeyPrefix(folder_key) +} + +pub struct SegmentIndexLookupKey<'a>(&'a FolderKey<'a>, u64); + +impl fmt::Display for SegmentIndexLookupKey<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}/indexes/{:020}", self.0, u64::MAX - self.1) + } +} + +fn s3_segment_index_lookup_key<'a>( + folder_key: &'a FolderKey, + frame_no: u64, +) -> SegmentIndexLookupKey<'a> { + SegmentIndexLookupKey(folder_key, frame_no) } impl Backend for S3Backend @@ -452,7 +593,7 @@ where }; let Some(segment_key) = self - .find_segment_inner(config, &folder_key, frame_no) + .find_segment_by_frame_no(config, &folder_key, frame_no) .await? else { return Err(Error::FrameNotFound(frame_no)); @@ -480,7 +621,7 @@ where // request a key bigger than any other to get the last segment let max_segment_key = self - .find_segment_inner(config, &folder_key, u64::MAX) + .find_segment_by_frame_no(config, &folder_key, u64::MAX) .await?; Ok(super::DbMeta { @@ -509,15 +650,23 @@ where &self, config: &Self::Config, namespace: &NamespaceName, - frame_no: u64, + req: FindSegmentReq, ) -> Result { let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace: &namespace, }; - self.find_segment_inner(config, &folder_key, frame_no) - .await? - .ok_or_else(|| Error::FrameNotFound(frame_no)) + + match req { + FindSegmentReq::Frame(frame_no) => self + .find_segment_by_frame_no(config, &folder_key, frame_no) + .await? + .ok_or_else(|| Error::FrameNotFound(frame_no)), + FindSegmentReq::Timestamp(ts) => self + .find_segment_by_timestamp(config, &folder_key, ts) + .await? + .ok_or_else(|| Error::SegmentNotFoundTimestamp(ts)), + } } async fn fetch_segment_index( @@ -754,7 +903,7 @@ mod tests { segment_id: Uuid::new_v4(), start_frame_no: 0u64.into(), end_frame_no: 64u64.into(), - created_at: Utc::now(), + segment_timestamp: Utc::now(), }, std::fs::File::open(&f_path).unwrap(), index, @@ -776,7 +925,7 @@ mod tests { segment_id: Uuid::new_v4(), start_frame_no: 64u64.into(), end_frame_no: 128u64.into(), - created_at: Utc::now(), + segment_timestamp: Utc::now(), }, std::fs::File::open(&f_path).unwrap(), index, diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index 9c46c206be..86053d3a6e 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -4,7 +4,6 @@ use std::path::PathBuf; use std::sync::Arc; use chrono::DateTime; -use chrono::Utc; use fst::map::OpBuilder; use fst::Streamer; use libsql_sys::name::NamespaceName; @@ -58,7 +57,7 @@ impl Compactor { "CREATE TABLE IF NOT EXISTS segments ( start_frame_no INTEGER, end_frame_no INTEGER, - created_at DATE, + timestamp DATE, size INTEGER, namespace_id INTEGER, PRIMARY KEY (start_frame_no, end_frame_no), @@ -93,7 +92,7 @@ impl Compactor { pub fn analyze(&self, namespace: &NamespaceName) -> Result { let mut stmt = self.meta.prepare_cached( r#" - SELECT start_frame_no, end_frame_no + SELECT start_frame_no, end_frame_no, timestamp FROM segments as s JOIN monitored_namespaces as m ON m.id = s.namespace_id @@ -105,11 +104,10 @@ impl Compactor { while let Some(row) = rows.next()? { let start_frame_no: u64 = row.get(0)?; let end_frame_no: u64 = row.get(1)?; - // it's free to go from one end of a segment to the next - graph.add_edge(start_frame_no, end_frame_no, 0); + let timestamp: u64 = row.get(2)?; + graph.add_edge(start_frame_no, end_frame_no, timestamp); if start_frame_no != 1 { - // going from a segment to the next costs us - graph.add_edge(start_frame_no - 1, start_frame_no, 1); + graph.add_edge(start_frame_no - 1, start_frame_no, 0); } last_frame_no = last_frame_no.max(end_frame_no); } @@ -246,6 +244,9 @@ impl Compactor { version: LIBSQL_WAL_VERSION.into(), magic: LIBSQL_MAGIC.into(), page_size: last_header.page_size, + // the new compacted segment inherit the last segment timestamp: it contains the same + // logical data. + timestamp: last_header.timestamp, }; hasher.update(header.as_bytes()); @@ -301,7 +302,11 @@ impl Compactor { segment_id: Uuid::new_v4(), start_frame_no: start, end_frame_no: end, - created_at: Utc::now(), + segment_timestamp: DateTime::from_timestamp_millis( + set.last().unwrap().timestamp as _, + ) + .unwrap() + .to_utc(), }, out_file, out_index.into_inner().unwrap(), @@ -365,17 +370,29 @@ impl AnalyzedSegments { &self.graph, 1, |n| n == self.last_frame_no, - |(_, _, &x)| x, + // it's always free to go from one end of the segment to the other, and it costs us to + // fetch a new segment. edges between segments are always 0, and edges within segments + // are the segment timestamp + |(_, _, &x)| if x == 0 { 1 } else { 0 }, |n| self.last_frame_no - n, ); let mut segments = Vec::new(); match path { Some((_len, nodes)) => { for chunk in nodes.chunks(2) { + let start_frame_no = chunk[0]; + let end_frame_no = chunk[1]; + let timestamp = *self + .graph + .edges(start_frame_no) + .find_map(|(_, to, ts)| (to == end_frame_no).then_some(ts)) + .unwrap(); let key = SegmentKey { - start_frame_no: chunk[0], - end_frame_no: chunk[1], + start_frame_no, + end_frame_no, + timestamp, }; + dbg!(&key); segments.push(key); } } @@ -445,7 +462,7 @@ fn list_segments<'a>( ) -> Result<()> { let mut stmt = conn.prepare_cached( r#" - SELECT created_at, size, start_frame_no, end_frame_no + SELECT timestamp, size, start_frame_no, end_frame_no FROM segments as s JOIN monitored_namespaces as m ON m.id == s.namespace_id @@ -459,9 +476,9 @@ fn list_segments<'a>( key: SegmentKey { start_frame_no: r.get(2)?, end_frame_no: r.get(3)?, + timestamp: r.get(0)?, }, size: r.get(1)?, - created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), }) })?; @@ -483,7 +500,7 @@ fn register_segment_info( INSERT OR IGNORE INTO segments ( start_frame_no, end_frame_no, - created_at, + timestamp, size, namespace_id ) @@ -492,7 +509,7 @@ fn register_segment_info( stmt.execute(( info.key.start_frame_no, info.key.end_frame_no, - info.created_at.timestamp(), + info.key.timestamp, info.size, namespace_id, ))?; @@ -505,7 +522,7 @@ fn segments_range( ) -> Result> { let mut stmt = conn.prepare_cached( r#" - SELECT min(created_at), size, start_frame_no, end_frame_no + SELECT min(timestamp), size, start_frame_no, end_frame_no FROM segments as s JOIN monitored_namespaces as m ON m.id == s.namespace_id @@ -519,16 +536,16 @@ fn segments_range( key: SegmentKey { start_frame_no: r.get(2)?, end_frame_no: r.get(3)?, + timestamp: r.get(0)?, }, size: r.get(1)?, - created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), }) }) .optional()?; let mut stmt = conn.prepare_cached( r#" - SELECT max(created_at), size, start_frame_no, end_frame_no + SELECT max(timestamp), size, start_frame_no, end_frame_no FROM segments as s JOIN monitored_namespaces as m ON m.id == s.namespace_id @@ -542,9 +559,9 @@ fn segments_range( key: SegmentKey { start_frame_no: r.get(2)?, end_frame_no: r.get(3)?, + timestamp: r.get(0)?, }, size: r.get(1)?, - created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), }) }) .optional()?; diff --git a/libsql-wal/src/storage/error.rs b/libsql-wal/src/storage/error.rs index d79d3788cb..f4b2f48930 100644 --- a/libsql-wal/src/storage/error.rs +++ b/libsql-wal/src/storage/error.rs @@ -1,5 +1,7 @@ use std::panic::Location; +use chrono::{DateTime, Utc}; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("io error: {0}")] @@ -10,6 +12,8 @@ pub enum Error { Compact(#[from] crate::error::Error), #[error("frame not {0} found")] FrameNotFound(u64), + #[error("No satisfying segment found for timestamp {0}")] + SegmentNotFoundTimestamp(DateTime), #[error("unhandled storage error: {error}, in {context}")] UnhandledStorageError { error: Box, diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index 6f1620066d..8f4fd9e393 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -69,7 +69,7 @@ where namespace: self.request.namespace.clone(), start_frame_no: segment.start_frame_no(), end_frame_no: segment.last_committed(), - created_at: io.now(), + segment_timestamp: segment.timestamp(), }; let config = self .request @@ -101,271 +101,22 @@ pub(crate) struct JobResult { #[cfg(test)] mod test { use std::future::ready; - // use std::fs::File; - // use std::io::Write; - // use std::mem::size_of; use std::str::FromStr; - // use std::sync::atomic::AtomicBool; use std::sync::Arc; + use chrono::prelude::DateTime; use chrono::Utc; - // use fst::{Map, Streamer}; - // use libsql_sys::rusqlite::OpenFlags; - // use tempfile::{tempdir, tempfile, NamedTempFile}; use uuid::Uuid; use crate::io::file::FileExt; use crate::io::StdIO; use crate::segment::compacted::CompactedSegmentDataHeader; + use crate::storage::backend::FindSegmentReq; use crate::storage::{RestoreOptions, SegmentKey}; - // use crate::registry::WalRegistry; - // use crate::segment::compacted::CompactedSegmentDataHeader; - // use crate::segment::sealed::SealedSegment; - // use crate::segment::{Frame, FrameHeader}; - // use crate::storage::Storage; - // use crate::wal::{LibsqlWal, LibsqlWalManager}; use libsql_sys::name::NamespaceName; use super::*; - // fn setup_wal( - // path: &Path, - // storage: S, - // ) -> (LibsqlWalManager, Arc>) { - // let resolver = |path: &Path| { - // NamespaceName::from_string(path.file_name().unwrap().to_str().unwrap().to_string()) - // }; - // let registry = - // Arc::new(WalRegistry::new(path.join("wals"), storage).unwrap()); - // (LibsqlWalManager::new(registry.clone()), registry) - // } - // - // fn make_connection( - // path: &Path, - // wal: LibsqlWalManager, - // ) -> libsql_sys::Connection> { - // libsql_sys::Connection::open( - // path.join("db"), - // OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE, - // wal, - // 10000, - // None, - // ) - // .unwrap() - // } - // - // #[tokio::test] - // async fn compact_segment() { - // struct SwapHandler; - // - // impl SegmentSwapHandler>> for SwapHandler { - // fn handle_segment_swap( - // &self, - // namespace: NamespaceName, - // segment: Arc>, - // ) { - // tokio::runtime::Handle::current().block_on(async move { - // let out_file = tempfile().unwrap(); - // let id = Uuid::new_v4(); - // let index_bytes = segment.compact(&out_file, id).await.unwrap(); - // let index = Map::new(index_bytes).unwrap(); - // - // // indexes contain the same pages - // let mut new_stream = index.stream(); - // let mut orig_stream = segment.index().stream(); - // assert_eq!(new_stream.next().unwrap().0, orig_stream.next().unwrap().0); - // assert_eq!(new_stream.next().unwrap().0, orig_stream.next().unwrap().0); - // assert!(new_stream.next().is_none()); - // assert!(orig_stream.next().is_none()); - // - // let mut db_file = NamedTempFile::new().unwrap(); - // let mut stream = index.stream(); - // while let Some((page_bytes, offset)) = stream.next() { - // let page_no = u32::from_be_bytes(page_bytes.try_into().unwrap()); - // let mut buf = [0u8; 4096]; - // let offset = size_of::() - // + offset as usize * size_of::() - // + size_of::(); - // out_file.read_exact_at(&mut buf, offset as u64).unwrap(); - // db_file - // .as_file() - // .write_all_at(&buf, (page_no as u64 - 1) * 4096) - // .unwrap(); - // } - // - // db_file.flush().unwrap(); - // let conn = libsql_sys::rusqlite::Connection::open(db_file.path()).unwrap(); - // conn.query_row("select count(*) from test", (), |r| Ok(())) - // .unwrap(); - // }); - // } - // } - // - // let tmp = tempdir().unwrap(); - // let (wal, registry) = setup_wal(tmp.path(), SwapHandler); - // let conn = make_connection(tmp.path(), wal.clone()); - // - // tokio::task::spawn_blocking(move || { - // conn.execute("create table test (x)", ()).unwrap(); - // for i in 0..100usize { - // conn.execute("insert into test values (?)", [i]).unwrap(); - // } - // - // registry.shutdown().unwrap(); - // }) - // .await - // .unwrap(); - // } - // - // #[tokio::test] - // async fn simple_perform_job() { - // struct TestIO; - // - // impl Io for TestIO { - // type File = ::File; - // type TempFile = ::TempFile; - // - // fn create_dir_all(&self, path: &Path) -> std::io::Result<()> { - // StdIO(()).create_dir_all(path) - // } - // - // fn open( - // &self, - // create_new: bool, - // read: bool, - // write: bool, - // path: &Path, - // ) -> std::io::Result { - // StdIO(()).open(create_new, read, write, path) - // } - // - // fn tempfile(&self) -> std::io::Result { - // StdIO(()).tempfile() - // } - // - // fn now(&self) -> DateTime { - // DateTime::UNIX_EPOCH - // } - // - // fn uuid(&self) -> Uuid { - // Uuid::from_u128(0) - // } - // - // fn hard_link(&self, _src: &Path, _dst: &Path) -> std::io::Result<()> { - // unimplemented!() - // } - // } - // - // struct TestStorage { - // called: AtomicBool, - // } - // - // impl Drop for TestStorage { - // fn drop(&mut self) { - // assert!(self.called.load(std::sync::atomic::Ordering::Relaxed)); - // } - // } - // - // impl Backend for TestStorage { - // type Config = (); - // - // fn store( - // &self, - // _config: &Self::Config, - // meta: SegmentMeta, - // segment_data: impl FileExt, - // segment_index: Vec, - // ) -> impl std::future::Future> + Send { - // async move { - // self.called - // .store(true, std::sync::atomic::Ordering::Relaxed); - // - // insta::assert_debug_snapshot!(meta); - // insta::assert_debug_snapshot!(crc32fast::hash(&segment_index)); - // insta::assert_debug_snapshot!(segment_index.len()); - // let data = async_read_all_to_vec(segment_data).await.unwrap(); - // insta::assert_debug_snapshot!(data.len()); - // insta::assert_debug_snapshot!(crc32fast::hash(&data)); - // - // Ok(()) - // } - // } - // - // async fn fetch_segment( - // &self, - // _config: &Self::Config, - // _namespace: NamespaceName, - // _frame_no: u64, - // _dest_path: &Path, - // ) -> Result<()> { - // todo!() - // } - // - // async fn meta( - // &self, - // _config: &Self::Config, - // _namespace: NamespaceName, - // ) -> Result { - // todo!(); - // } - // - // fn default_config(&self) -> Arc { - // Arc::new(()) - // } - // } - // - // struct SwapHandler; - // - // impl SegmentSwapHandler for SwapHandler { - // fn handle_segment_swap( - // &self, - // namespace: NamespaceName, - // segment: Arc>, - // ) { - // tokio::runtime::Handle::current().block_on(async move { - // let job = Job { - // request: IndexedRequest { - // request: StoreSegmentRequest { - // namespace, - // segment, - // created_at: TestIO.now(), - // storage_config_override: None, - // }, - // id: 0, - // }, - // }; - // - // let result = job - // .perform( - // TestStorage { - // called: false.into(), - // }, - // TestIO, - // ) - // .await; - // - // assert_eq!(result.job.request.id, 0); - // assert!(result.result.is_ok()); - // }); - // } - // } - // - // let tmp = tempdir().unwrap(); - // let (wal, registry) = setup_wal(tmp.path(), SwapHandler); - // let conn = make_connection(tmp.path(), wal.clone()); - // - // tokio::task::spawn_blocking(move || { - // conn.execute("create table test (x)", ()).unwrap(); - // for i in 0..100usize { - // conn.execute("insert into test values (?)", [i]).unwrap(); - // } - // - // registry.shutdown().unwrap(); - // }) - // .await - // .unwrap(); - // } - #[tokio::test] async fn perform_job() { #[derive(Debug)] @@ -428,6 +179,10 @@ mod test { fn is_storable(&self) -> bool { true } + + fn timestamp(&self) -> DateTime { + Utc::now() + } } struct TestBackend; @@ -478,7 +233,7 @@ mod test { &self, _config: &Self::Config, _namespace: &NamespaceName, - _frame_no: u64, + _frame_no: FindSegmentReq, ) -> Result { todo!() } diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 5ebff20220..8e746280a4 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::fmt::Debug; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; @@ -60,10 +61,21 @@ pub enum RestoreOptions { /// map.range(format!("{:020}", u64::MAX - 101)..).next(); /// map.range(format!("{:020}", u64::MAX - 5000)..).next(); /// ``` -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Clone, Copy, PartialEq, Eq)] pub struct SegmentKey { pub start_frame_no: u64, pub end_frame_no: u64, + pub timestamp: u64, +} + +impl Debug for SegmentKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SegmentKey") + .field("start_frame_no", &self.start_frame_no) + .field("end_frame_no", &self.end_frame_no) + .field("timestamp", &self.timestamp()) + .finish() + } } impl PartialOrd for SegmentKey { @@ -108,6 +120,12 @@ impl SegmentKey { Some(key) } + + fn timestamp(&self) -> DateTime { + DateTime::from_timestamp_millis(self.timestamp as _) + .unwrap() + .to_utc() + } } impl From<&SegmentMeta> for SegmentKey { @@ -115,6 +133,7 @@ impl From<&SegmentMeta> for SegmentKey { Self { start_frame_no: value.start_frame_no, end_frame_no: value.end_frame_no, + timestamp: value.segment_timestamp.timestamp_millis() as _, } } } @@ -125,11 +144,13 @@ impl FromStr for SegmentKey { fn from_str(s: &str) -> std::result::Result { let (rev_start_fno, s) = s.split_at(20); let start_frame_no = u64::MAX - rev_start_fno.parse::().map_err(|_| ())?; - let (_, rev_end_fno) = s.split_at(1); + let (rev_end_fno, timestamp) = s[1..].split_at(20); let end_frame_no = u64::MAX - rev_end_fno.parse::().map_err(|_| ())?; + let timestamp = timestamp[1..].parse().map_err(|_| ())?; Ok(Self { start_frame_no, end_frame_no, + timestamp, }) } } @@ -138,9 +159,10 @@ impl fmt::Display for SegmentKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{:020}-{:020}", + "{:020}-{:020}-{:020}", u64::MAX - self.start_frame_no, u64::MAX - self.end_frame_no, + self.timestamp, ) } } @@ -219,7 +241,6 @@ pub trait Storage: Send + Sync + 'static { pub struct SegmentInfo { pub key: SegmentKey, pub size: usize, - pub created_at: DateTime, } /// special zip function for Either storage implementation @@ -504,6 +525,7 @@ impl Storage for TestStorage { let key = SegmentKey { start_frame_no: seg.header().start_frame_no.get(), end_frame_no, + timestamp: seg.header().sealed_at_timestamp.get(), }; let index = Map::new(index.into()).unwrap(); inner @@ -624,9 +646,9 @@ pub struct StoreSegmentRequest { on_store_callback: OnStoreCallback, } -impl fmt::Debug for StoreSegmentRequest +impl Debug for StoreSegmentRequest where - S: fmt::Debug, + S: Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StoreSegmentRequest") diff --git a/libsql-wal/tests/flaky_fs.rs b/libsql-wal/tests/flaky_fs.rs index a06b382608..0c18bdee0b 100644 --- a/libsql-wal/tests/flaky_fs.rs +++ b/libsql-wal/tests/flaky_fs.rs @@ -5,6 +5,7 @@ use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Arc; +use chrono::prelude::{DateTime, Utc}; use libsql_wal::io::{file::FileExt, Io}; use libsql_wal::registry::WalRegistry; use libsql_wal::storage::TestStorage; @@ -140,8 +141,8 @@ impl Io for FlakyIo { todo!() } - fn now(&self) -> chrono::prelude::DateTime { - todo!() + fn now(&self) -> DateTime { + Utc::now() } fn hard_link(&self, _src: &Path, _dst: &Path) -> std::io::Result<()> {