Skip to content

Commit

Permalink
Merge branch 'tursodatabase:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kunjee17 authored Jan 15, 2025
2 parents e59d633 + 5b8934e commit b8a89cb
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,14 @@ impl Replicator {
})?;
tracing::trace!("Local change counter: {change_counter}");

// TODO: we shouldn't leak the connection here but for some reason when this connection get
// dropped it seems to checkpoint the database
if std::env::var("LIBSQL_BOTTOMLESS_DISABLE_INIT_CHECKPOINTING").is_ok()
|| std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok()
{
std::mem::forget(conn);
}

Ok(change_counter.to_be_bytes())
}

Expand Down
4 changes: 2 additions & 2 deletions libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ where
}
Err(Error::Client(e)) if !error_printed => {
if e.downcast_ref::<uuid::Error>().is_some() {
tracing::error!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}");
tracing::warn!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}");
} else {
tracing::error!("error connecting to primary. retrying. error: {e}");
tracing::warn!("error connecting to primary. retrying. error: {e}");
}

error_printed = true;
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "libsql-server"
version = "0.24.30"
version = "0.24.31"
edition = "2021"
default-run = "sqld"
repository = "https://github.com/tursodatabase/libsql"
Expand Down
13 changes: 10 additions & 3 deletions libsql-server/src/namespace/configurator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,16 @@ pub(super) async fn make_primary_connection_maker(

let bottomless_replicator = match primary_config.bottomless_replication {
Some(ref options) => {
tracing::debug!("Checkpointing before initializing bottomless");
crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?;
tracing::debug!("Checkpointed before initializing bottomless");
// TODO: figure out why we really need this the fixme above is not clear enough but
// disabling this allows us to prevent checkpointing of the wal file.
if !std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok() {
tracing::debug!("Checkpointing before initializing bottomless");
crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?;
tracing::debug!("Checkpointed before initializing bottomless");
} else {
tracing::warn!("Disabling initial checkpoint before bottomless");
}

let options = make_bottomless_options(options, bottomless_db_id, name.clone());
let (replicator, did_recover) =
init_bottomless_replicator(db_path.join("data"), options, &restore_option).await?;
Expand Down
12 changes: 10 additions & 2 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum DbType {
path: String,
flags: OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_saftey_assert: bool,
},
#[cfg(feature = "replication")]
Sync {
Expand Down Expand Up @@ -146,6 +147,7 @@ cfg_core! {
path: db_path.into(),
flags,
encryption_config: None,
skip_saftey_assert: false,
},
max_write_replication_index: Default::default(),
})
Expand Down Expand Up @@ -429,7 +431,7 @@ cfg_replication! {
DbType::Sync { db, .. } => {
let path = db.path().to_string();
Ok(Database {
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None},
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false },
max_write_replication_index: Default::default(),
})
}
Expand Down Expand Up @@ -550,10 +552,16 @@ impl Database {
path,
flags,
encryption_config,
skip_saftey_assert,
} => {
use crate::local::impls::LibsqlConnection;

let db = crate::local::Database::open(path, *flags)?;
let db = if !skip_saftey_assert {
crate::local::Database::open(path, *flags)?
} else {
unsafe { crate::local::Database::open_raw(path, *flags)? }
};

let conn = db.connect()?;

if !cfg!(feature = "encryption") && encryption_config.is_some() {
Expand Down
93 changes: 77 additions & 16 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl Builder<()> {
path: path.as_ref().to_path_buf(),
flags: crate::OpenFlags::default(),
encryption_config: None,
skip_safety_assert: false,
},
}
}
Expand All @@ -64,7 +65,8 @@ impl Builder<()> {
read_your_writes: true,
sync_interval: None,
http_request_callback: None,
namespace: None
namespace: None,
skip_safety_assert: false,
},
}
}
Expand Down Expand Up @@ -137,6 +139,7 @@ cfg_core! {
path: std::path::PathBuf,
flags: crate::OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_safety_assert: bool,
}

impl Builder<Local> {
Expand All @@ -155,10 +158,29 @@ cfg_core! {
self
}

/// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way
/// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED
/// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to
/// allow them to move between threads safely. Due to the fact that sqlite3 has a global
/// config this may conflict with other sqlite3 connections in the same process.
///
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<Local> {
self.inner.skip_safety_assert = skip;
self
}

/// Build the local database.
pub async fn build(self) -> Result<Database> {
let db = if self.inner.path == std::path::Path::new(":memory:") {
let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?;
let db = if !self.inner.skip_safety_assert {
crate::local::Database::open(":memory:", crate::OpenFlags::default())?
} else {
unsafe { crate::local::Database::open_raw(":memory:", crate::OpenFlags::default())? }
};

Database {
db_type: DbType::Memory { db } ,
max_write_replication_index: Default::default(),
Expand All @@ -176,6 +198,7 @@ cfg_core! {
path,
flags: self.inner.flags,
encryption_config: self.inner.encryption_config,
skip_saftey_assert: self.inner.skip_safety_assert
},
max_write_replication_index: Default::default(),
}
Expand All @@ -196,6 +219,7 @@ cfg_replication! {
sync_interval: Option<std::time::Duration>,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
skip_safety_assert: bool,
}

/// Local replica configuration type in [`Builder`].
Expand Down Expand Up @@ -270,6 +294,20 @@ cfg_replication! {
self
}

/// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way
/// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED
/// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to
/// allow them to move between threads safely. Due to the fact that sqlite3 has a global
/// config this may conflict with other sqlite3 connections in the same process.
///
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<RemoteReplica> {
self.inner.skip_safety_assert = skip;
self
}

/// Build the remote embedded replica database.
pub async fn build(self) -> Result<Database> {
let RemoteReplica {
Expand All @@ -285,7 +323,8 @@ cfg_replication! {
read_your_writes,
sync_interval,
http_request_callback,
namespace
namespace,
skip_safety_assert
} = self.inner;

let connector = if let Some(connector) = connector {
Expand All @@ -303,19 +342,41 @@ cfg_replication! {

let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();

let db = crate::local::Database::open_http_sync_internal(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?;
let db = if !skip_safety_assert {
crate::local::Database::open_http_sync_internal(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?
} else {
// SAFETY: this can only be enabled via the unsafe config function
// `skip_safety_assert`.
unsafe {
crate::local::Database::open_http_sync_internal2(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?
}

};


Ok(Database {
db_type: DbType::Sync { db, encryption_config },
Expand Down
87 changes: 85 additions & 2 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ impl Database {
}
}

/// Safety: this is like `open` but does not enfoce that sqlite_config has THREADSAFE set to
/// `SQLITE_CONFIG_SERIALIZED`, calling
pub unsafe fn open_raw<S: Into<String>>(db_path: S, flags: OpenFlags) -> Result<Database> {
let db_path = db_path.into();

if db_path.starts_with("libsql:")
|| db_path.starts_with("http:")
|| db_path.starts_with("https:")
{
Err(ConnectionFailed(format!(
"Unable to open local database {db_path} with Database::open()"
)))
} else {
Ok(Database {
db_path,
flags,
#[cfg(feature = "replication")]
replication_ctx: None,
#[cfg(feature = "sync")]
sync_ctx: None,
})
}
}

#[cfg(feature = "replication")]
pub async fn open_http_sync(
connector: crate::util::ConnectorService,
Expand Down Expand Up @@ -128,6 +152,57 @@ impl Database {
Ok(db)
}

#[cfg(feature = "replication")]
#[doc(hidden)]
pub async unsafe fn open_http_sync_internal2(
connector: crate::util::ConnectorService,
db_path: String,
endpoint: String,
auth_token: String,
version: Option<String>,
read_your_writes: bool,
encryption_config: Option<EncryptionConfig>,
sync_interval: Option<std::time::Duration>,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
) -> Result<Database> {
use std::path::PathBuf;

use crate::util::coerce_url_scheme;

let mut db = Database::open_raw(&db_path, OpenFlags::default())?;

let endpoint = coerce_url_scheme(endpoint);
let remote = crate::replication::client::Client::new(
connector.clone(),
endpoint
.as_str()
.try_into()
.map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?,
auth_token.clone(),
version.as_deref(),
http_request_callback.clone(),
namespace,
)
.map_err(|e| crate::Error::Replication(e.into()))?;
let path = PathBuf::from(db_path);
let client = RemoteClient::new(remote.clone(), &path)
.await
.map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?;

let replicator =
EmbeddedReplicator::with_remote(client, path, 1000, encryption_config, sync_interval)
.await?;

db.replication_ctx = Some(ReplicationContext {
replicator,
client: Some(remote),
read_your_writes,
});

Ok(db)
}

#[cfg(feature = "sync")]
#[doc(hidden)]
pub async fn open_local_with_offline_writes(
Expand Down Expand Up @@ -420,7 +495,11 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
async fn try_push(
&self,
sync_ctx: &mut SyncContext,
conn: &Connection,
) -> Result<crate::database::Replicated> {
let page_size = {
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
Expand Down Expand Up @@ -471,7 +550,11 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
async fn try_pull(
&self,
sync_ctx: &mut SyncContext,
conn: &Connection,
) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
conn.wal_insert_begin()?;
Expand Down

0 comments on commit b8a89cb

Please sign in to comment.