Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 4, 2024
1 parent 7ad2188 commit 8918d29
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 17 deletions.
2 changes: 1 addition & 1 deletion libsql-replication/src/injector/sqlite_injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Injector for SqliteInjector {
}

#[inline]
fn durable_frame_no(&mut self, _frame_no: u64) { }
fn durable_frame_no(&mut self, _frame_no: u64) {}
}

impl SqliteInjector {
Expand Down
12 changes: 9 additions & 3 deletions libsql-server/src/rpc/replication/libsql_replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ pin_project_lite::pin_project! {

impl<S> FrameStreamAdapter<S> {
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
Self { inner, flavor, shared }
Self {
inner,
flavor,
shared,
}
}
}

Expand Down Expand Up @@ -150,8 +154,10 @@ impl ReplicationLog for LibsqlReplicationService {
let shared = self.registry.get_async(&namespace.into()).await.unwrap();
let req = req.into_inner();
// TODO: replicator should only accecpt NonZero
let replicator =
libsql_wal::replication::replicator::Replicator::new(shared.clone(), req.next_offset.max(1));
let replicator = libsql_wal::replication::replicator::Replicator::new(
shared.clone(),
req.next_offset.max(1),
);

let flavor = req.wal_flavor();
let stream = FrameStreamAdapter::new(replicator.into_frame_stream(), flavor, shared);
Expand Down
18 changes: 11 additions & 7 deletions libsql-wal/src/replication/injector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<IO: Io> Injector<IO> {
capacity: buffer_capacity,
tx: None,
max_tx_frame_no: 0,
previous_durable_frame_no: 0,
previous_durable_frame_no: 0,
})
}

Expand Down Expand Up @@ -84,25 +84,29 @@ impl<IO: Io> Injector<IO> {
if commit_data.is_some() {
self.max_tx_frame_no = 0;
}
let buffer = current
.inject_frames(buffer, commit_data, tx)
.await?;
let buffer = current.inject_frames(buffer, commit_data, tx).await?;
self.buffer = buffer;
self.buffer.clear();
}

if size_after.is_some() {
let mut tx = self.tx.take().unwrap();
self.wal.new_frame_notifier.send_replace(last_committed_frame_no);
self.wal
.new_frame_notifier
.send_replace(last_committed_frame_no);
// the strategy to swap the current log is to do it on change of durable boundary,
// when we have caught up with the current durable frame_no
if self.current_durable() != self.previous_durable_frame_no && self.current_durable() >= self.max_tx_frame_no {
if self.current_durable() != self.previous_durable_frame_no
&& self.current_durable() >= self.max_tx_frame_no
{
let wal = self.wal.clone();
// FIXME: tokio dependency here is annoying, we need an async version of swap_current.
tokio::task::spawn_blocking(move || {
tx.commit();
wal.swap_current(&tx)
}).await.unwrap()?
})
.await
.unwrap()?
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::io::file::FileExt;
use crate::io::Inspect;
use crate::segment::{checked_frame_offset, SegmentFlags};
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
use crate::transaction::{Transaction, TxGuardShared, TxGuardOwned};
use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared};
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};

use super::list::SegmentList;
Expand Down
5 changes: 4 additions & 1 deletion libsql-wal/src/segment/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ where
// readers pointing to them
while let Some(segment) = &*current {
// skip any segment more recent than until_frame_no
tracing::debug!(last_committed = segment.last_committed(), until = until_frame_no);
tracing::debug!(
last_committed = segment.last_committed(),
until = until_frame_no
);
if segment.last_committed() <= until_frame_no {
if !segment.is_checkpointable() {
segs.clear();
Expand Down
2 changes: 1 addition & 1 deletion libsql-wal/src/shared_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl<IO: Io> SharedWal<IO> {
pub fn durable_frame_no(&self) -> u64 {
*self.durable_frame_no.lock()
}

#[tracing::instrument(skip_all)]
pub fn begin_read(&self, conn_id: u64) -> ReadTransaction<IO::File> {
// FIXME: this is not enough to just increment the counter, we must make sure that the segment
Expand Down
6 changes: 3 additions & 3 deletions libsql-wal/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ impl<F> DerefMut for TxGuardOwned<F> {
}
}

pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync { }
pub trait TxGuard<F>: Deref<Target = WriteTransaction<F>> + DerefMut + Send + Sync {}

impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> { }
impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> { }
impl<'a, F: Send + Sync> TxGuard<F> for TxGuardShared<'a, F> {}
impl<F: Send + Sync> TxGuard<F> for TxGuardOwned<F> {}

pub struct TxGuardShared<'a, F> {
_lock: async_lock::MutexGuardArc<Option<u64>>,
Expand Down

0 comments on commit 8918d29

Please sign in to comment.