Skip to content

Commit

Permalink
Merge pull request #1727 from tursodatabase/throttle-segment-creation
Browse files Browse the repository at this point in the history
Swap segment strategy
  • Loading branch information
MarinPostma authored Sep 13, 2024
2 parents e98c6ce + ecb06b1 commit 8a1ba79
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 9 deletions.
1 change: 1 addition & 0 deletions libsql-wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod io;
pub mod registry;
pub mod replication;
pub mod segment;
mod segment_swap_strategy;
pub mod shared_wal;
pub mod storage;
pub mod transaction;
Expand Down
18 changes: 16 additions & 2 deletions libsql-wal/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::num::NonZeroU64;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use dashmap::DashMap;
use libsql_sys::ffi::Sqlite3DbHeader;
Expand All @@ -25,6 +25,9 @@ use crate::replication::storage::{ReplicateFromStorage as _, StorageReplicator};
use crate::segment::list::SegmentList;
use crate::segment::Segment;
use crate::segment::{current::CurrentSegment, sealed::SealedSegment};
use crate::segment_swap_strategy::duration::DurationSwapStrategy;
use crate::segment_swap_strategy::frame_count::FrameCountSwapStrategy;
use crate::segment_swap_strategy::SegmentSwapStrategy;
use crate::shared_wal::{SharedWal, SwapLog};
use crate::storage::{OnStoreCallback, Storage};
use crate::transaction::TxGuard;
Expand Down Expand Up @@ -337,6 +340,17 @@ where

let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);

// FIXME: make swap strategy configurable
// This strategy will perform a swap if either the wal is bigger than 20k frames, or older
// than 10 minutes, or if the frame count is greater than a 1000 and the wal was last
// swapped more than 30 secs ago
let swap_strategy = Box::new(
DurationSwapStrategy::new(Duration::from_secs(5 * 60))
.or(FrameCountSwapStrategy::new(20_000))
.or(FrameCountSwapStrategy::new(1000)
.and(DurationSwapStrategy::new(Duration::from_secs(30)))),
);

let shared = Arc::new(SharedWal {
current,
wal_lock: Default::default(),
Expand All @@ -352,8 +366,8 @@ where
)),
shutdown: false.into(),
checkpoint_notifier: self.checkpoint_notifier.clone(),
max_segment_size: 1000.into(),
io: self.io.clone(),
swap_strategy,
});

self.opened
Expand Down
33 changes: 33 additions & 0 deletions libsql-wal/src/segment_swap_strategy/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::time::{Duration, Instant};

use parking_lot::Mutex;

use super::SegmentSwapStrategy;

/// A wal swap strategy that swaps the current wal if it's older that some duration
pub struct DurationSwapStrategy {
swap_after: Duration,
last_swapped_at: Mutex<Instant>,
}

impl DurationSwapStrategy {
pub fn new(swap_after: Duration) -> Self {
Self {
swap_after,
last_swapped_at: Mutex::new(Instant::now()),
}
}
}

impl SegmentSwapStrategy for DurationSwapStrategy {
#[inline(always)]
fn should_swap(&self, _frames_in_wal: usize) -> bool {
let last_swapped_at = self.last_swapped_at.lock();
last_swapped_at.elapsed() >= self.swap_after
}

#[inline(always)]
fn swapped(&self) {
*self.last_swapped_at.lock() = Instant::now();
}
}
22 changes: 22 additions & 0 deletions libsql-wal/src/segment_swap_strategy/frame_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use super::SegmentSwapStrategy;

/// A swap strategy that swaps if the count of frames in the wal exceed some threshold
pub struct FrameCountSwapStrategy {
max_frames_in_wal: usize,
}

impl FrameCountSwapStrategy {
pub fn new(max_frames_in_wal: usize) -> Self {
Self { max_frames_in_wal }
}
}

impl SegmentSwapStrategy for FrameCountSwapStrategy {
#[inline(always)]
fn should_swap(&self, frames_in_wal: usize) -> bool {
frames_in_wal >= self.max_frames_in_wal
}

#[inline(always)]
fn swapped(&self) {}
}
59 changes: 59 additions & 0 deletions libsql-wal/src/segment_swap_strategy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
pub(crate) mod duration;
pub(crate) mod frame_count;

pub(crate) trait SegmentSwapStrategy: Sync + Send + 'static {
fn should_swap(&self, frames_in_wal: usize) -> bool;
fn swapped(&self);

fn and<O: SegmentSwapStrategy>(self, other: O) -> And<Self, O>
where
Self: Sized,
{
And(self, other)
}

fn or<O: SegmentSwapStrategy>(self, other: O) -> Or<Self, O>
where
Self: Sized,
{
Or(self, other)
}
}

pub struct And<A, B>(A, B);

impl<A, B> SegmentSwapStrategy for And<A, B>
where
A: SegmentSwapStrategy,
B: SegmentSwapStrategy,
{
#[inline(always)]
fn should_swap(&self, frames_in_wal: usize) -> bool {
self.0.should_swap(frames_in_wal) && self.1.should_swap(frames_in_wal)
}

#[inline(always)]
fn swapped(&self) {
self.0.swapped();
self.1.swapped();
}
}

pub struct Or<A, B>(A, B);

impl<A, B> SegmentSwapStrategy for Or<A, B>
where
A: SegmentSwapStrategy,
B: SegmentSwapStrategy,
{
#[inline(always)]
fn should_swap(&self, frames_in_wal: usize) -> bool {
self.0.should_swap(frames_in_wal) || self.1.should_swap(frames_in_wal)
}

#[inline(always)]
fn swapped(&self) {
self.0.swapped();
self.1.swapped();
}
}
13 changes: 6 additions & 7 deletions libsql-wal/src/shared_wal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -16,6 +16,7 @@ use crate::io::file::FileExt;
use crate::io::Io;
use crate::replication::storage::ReplicateFromStorage;
use crate::segment::current::CurrentSegment;
use crate::segment_swap_strategy::SegmentSwapStrategy;
use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
use libsql_sys::name::NamespaceName;

Expand Down Expand Up @@ -46,15 +47,14 @@ pub struct SharedWal<IO: Io> {
pub(crate) registry: Arc<dyn SwapLog<IO>>,
#[allow(dead_code)] // used by replication
pub(crate) checkpointed_frame_no: AtomicU64,
/// max frame_no acknoledged by the durable storage
/// max frame_no acknowledged by the durable storage
pub(crate) durable_frame_no: Arc<Mutex<u64>>,
pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
pub(crate) shutdown: AtomicBool,
pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
/// maximum size the segment is allowed to grow
pub(crate) max_segment_size: AtomicUsize,
pub(crate) io: Arc<IO>,
pub(crate) swap_strategy: Box<dyn SegmentSwapStrategy>,
}

impl<IO: Io> SharedWal<IO> {
Expand Down Expand Up @@ -274,10 +274,9 @@ impl<IO: Io> SharedWal<IO> {
self.new_frame_notifier.send_replace(last_committed);
}

if tx.is_commited()
&& current.count_committed() > self.max_segment_size.load(Ordering::Relaxed)
{
if tx.is_commited() && self.swap_strategy.should_swap(current.count_committed()) {
self.swap_current(&tx)?;
self.swap_strategy.swapped();
}

Ok(())
Expand Down

0 comments on commit 8a1ba79

Please sign in to comment.