Skip to content

Commit

Permalink
Merge pull request #1652 from tursodatabase/libsql-wal-replicator
Browse files Browse the repository at this point in the history
libsql wal replicator
  • Loading branch information
MarinPostma authored Aug 10, 2024
2 parents 8077948 + d29ca7f commit e4c2afc
Show file tree
Hide file tree
Showing 27 changed files with 695 additions and 400 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rusqlite = { package = "libsql-rusqlite", path = "vendored/rusqlite", version =
] }
hyper = { version = "0.14" }
tower = { version = "0.4.13" }
zerocopy = { version = "0.7.32", features = ["derive", "alloc"] }

# Config for 'cargo dist'
[workspace.metadata.dist]
Expand Down
15 changes: 11 additions & 4 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::{Client, Config};
use bytes::{Buf, Bytes};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use libsql_replication::injector::Injector as _;
use libsql_replication::rpc::replication::Frame as RpcFrame;
use libsql_sys::{Cipher, EncryptionConfig};
use std::ops::Deref;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -1449,12 +1451,13 @@ impl Replicator {
db_path: &Path,
) -> Result<bool> {
let encryption_config = self.encryption_config.clone();
let mut injector = libsql_replication::injector::Injector::new(
db_path,
let mut injector = libsql_replication::injector::SqliteInjector::new(
db_path.to_path_buf(),
4096,
libsql_sys::connection::NO_AUTOCHECKPOINT,
encryption_config,
)?;
)
.await?;
let prefix = format!("{}-{}/", self.db_name, generation);
let mut page_buf = {
let mut v = Vec::with_capacity(page_size);
Expand Down Expand Up @@ -1552,7 +1555,11 @@ impl Replicator {
},
page_buf.as_slice(),
);
injector.inject_frame(frame_to_inject)?;
let frame = RpcFrame {
data: frame_to_inject.bytes(),
timestamp: None,
};
injector.inject_frame(frame).await?;
applied_wal_frame = true;
}
}
Expand Down
2 changes: 2 additions & 0 deletions libsql-replication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ license = "MIT"
tonic = { version = "0.11", features = ["tls"] }
prost = "0.12"
libsql-sys = { version = "0.7", path = "../libsql-sys", default-features = false, features = ["wal", "rusqlite", "api"] }
libsql-wal = { path = "../libsql-wal/", optional = true }
rusqlite = { workspace = true }
parking_lot = "0.12.1"
bytes = { version = "1.5.0", features = ["serde"] }
Expand All @@ -37,3 +38,4 @@ tonic-build = "0.11"

[features]
encryption = ["libsql-sys/encryption"]
libsql_wal = ["dep:libsql-wal"]
6 changes: 6 additions & 0 deletions libsql-replication/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import "metadata.proto";

message LogOffset {
uint64 next_offset = 1;
enum WalFlavor {
Sqlite = 0;
Libsql = 1;
}
// the type of wal frames that the client is expecting
optional WalFlavor wal_flavor = 2;
}

message HelloRequest {
Expand Down
3 changes: 1 addition & 2 deletions libsql-replication/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::LIBSQL_PAGE_SIZE;
pub type FrameNo = u64;

/// The file header for the WAL log. All fields are represented in little-endian ordering.
/// See `encode` and `decode` for actual layout.
// repr C for stable sizing
#[repr(C)]
#[derive(Debug, Clone, Copy, zerocopy::FromZeroes, zerocopy::FromBytes, zerocopy::AsBytes)]
Expand All @@ -22,7 +21,7 @@ pub struct FrameHeader {
pub frame_no: lu64,
/// Rolling checksum of all the previous frames, including this one.
pub checksum: lu64,
/// page number, if frame_type is FrameType::Page
/// page number
pub page_no: lu32,
/// Size of the database (in page) after committing the transaction. This is passed from sqlite,
/// and serves as commit transaction boundary
Expand Down
42 changes: 42 additions & 0 deletions libsql-replication/src/generated/wal_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,48 @@
pub struct LogOffset {
#[prost(uint64, tag = "1")]
pub next_offset: u64,
/// the type of wal frames that the client is expecting
#[prost(enumeration = "log_offset::WalFlavor", optional, tag = "2")]
pub wal_flavor: ::core::option::Option<i32>,
}
/// Nested message and enum types in `LogOffset`.
pub mod log_offset {
#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
::prost::Enumeration
)]
#[repr(i32)]
pub enum WalFlavor {
Sqlite = 0,
Libsql = 1,
}
impl WalFlavor {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
WalFlavor::Sqlite => "Sqlite",
WalFlavor::Libsql => "Libsql",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"Sqlite" => Some(Self::Sqlite),
"Libsql" => Some(Self::Libsql),
_ => None,
}
}
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
7 changes: 5 additions & 2 deletions libsql-replication/src/injector/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("SQLite error: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("A fatal error occured injecting frames")]
FatalInjectError,
#[error("A fatal error occured injecting frames: {0}")]
FatalInjectError(BoxError),
}
45 changes: 45 additions & 0 deletions libsql-replication/src/injector/libsql_injector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::mem::size_of;

use libsql_wal::io::StdIO;
use libsql_wal::replication::injector::Injector;
use libsql_wal::segment::Frame as WalFrame;
use zerocopy::{AsBytes, FromZeroes};

use crate::frame::FrameNo;
use crate::rpc::replication::Frame as RpcFrame;

use super::error::{Error, Result};

pub struct LibsqlInjector {
injector: Injector<StdIO>,
}

impl super::Injector for LibsqlInjector {
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
// must copy it...
// FIXME: optimize this.
let mut wal_frame = WalFrame::new_box_zeroed();
if frame.data.len() != size_of::<WalFrame>() {
todo!("invalid frame");
}
wal_frame.as_bytes_mut().copy_from_slice(&frame.data[..]);
Ok(self
.injector
.insert_frame(wal_frame)
.await
.map_err(|e| Error::FatalInjectError(e.into()))?)
}

async fn rollback(&mut self) {
self.injector.rollback();
}

async fn flush(&mut self) -> Result<Option<FrameNo>> {
self.injector
.flush(None)
.await
.map_err(|e| Error::FatalInjectError(e.into()))?;
Ok(None)
}
}
Loading

0 comments on commit e4c2afc

Please sign in to comment.