From 923cebf902c409341bee8faba00b669b770f7ea1 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Tue, 31 Dec 2024 13:17:10 +0200 Subject: [PATCH 1/4] libsql-replication: Switch retry message to warn log level If we fail to contact the primary, we return an error message. Let's switch to warn log level to avoid scaring users. --- libsql-replication/src/replicator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index f28bb9baa8..ca2de123ca 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -224,9 +224,9 @@ where } Err(Error::Client(e)) if !error_printed => { if e.downcast_ref::().is_some() { - tracing::error!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}"); + tracing::warn!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}"); } else { - tracing::error!("error connecting to primary. retrying. error: {e}"); + tracing::warn!("error connecting to primary. retrying. error: {e}"); } error_printed = true; From d95fe5d4eec9ef46d302c364607142a548757a5f Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 31 Dec 2024 15:53:38 -0500 Subject: [PATCH 2/4] sqld: disable checkpoint on primary conn create This commit changes our primary connection initialization code in two ways to achieve the ability to disable checkpointing the wal. 1) We ignore the initial checkpoint that we call directly into sqlite3 before we restore from bottomless. There is a fixme above that explains why we need this but to me right now its not totally clear why without digging deeper into the internals of bottomless. We should do this but for the moment this unblocks us and from the fixme comment it does not sound unsafe rather doing extra work potentially. 2) When bottomless needs to get the local change counter is creates a sqlite connection. When this connection drops it seems like it checkpoints the wal. I took a brief look at the `sqlite3_close` code and did not find anything obvious, I have been told in the past that sqlite3 likes to checkpoint at weird points so this could be one of those. For the moment, the temporary fix like above is to `std::mem::forget` the connection so that `Drop` never gets called and thus `sqlite3_close` never gets called. With both of these changes we now don't checkpoint the wal unless we hit the max size or interval (which for testing I have set very high). This changes are not enabled by default but must be enabled by setting the following env var: ``` LIBSQL_DISABLE_INIT_CHECKPOINTING=1 LIBSQL_BOTTOMLESS_DISABLE_INIT_CHECKPOINTING=1 ``` --- bottomless/src/replicator.rs | 8 ++++++++ libsql-server/src/namespace/configurator/helpers.rs | 13 ++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 180ca38576..6f318fd125 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -935,6 +935,14 @@ impl Replicator { })?; tracing::trace!("Local change counter: {change_counter}"); + // TODO: we shouldn't leak the connection here but for some reason when this connection get + // dropped it seems to checkpoint the database + if std::env::var("LIBSQL_BOTTOMLESS_DISABLE_INIT_CHECKPOINTING").is_ok() + || std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok() + { + std::mem::forget(conn); + } + Ok(change_counter.to_be_bytes()) } diff --git a/libsql-server/src/namespace/configurator/helpers.rs b/libsql-server/src/namespace/configurator/helpers.rs index 081eeaef80..c84c963805 100644 --- a/libsql-server/src/namespace/configurator/helpers.rs +++ b/libsql-server/src/namespace/configurator/helpers.rs @@ -84,9 +84,16 @@ pub(super) async fn make_primary_connection_maker( let bottomless_replicator = match primary_config.bottomless_replication { Some(ref options) => { - tracing::debug!("Checkpointing before initializing bottomless"); - crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?; - tracing::debug!("Checkpointed before initializing bottomless"); + // TODO: figure out why we really need this the fixme above is not clear enough but + // disabling this allows us to prevent checkpointing of the wal file. + if !std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok() { + tracing::debug!("Checkpointing before initializing bottomless"); + crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?; + tracing::debug!("Checkpointed before initializing bottomless"); + } else { + tracing::warn!("Disabling initial checkpoint before bottomless"); + } + let options = make_bottomless_options(options, bottomless_db_id, name.clone()); let (replicator, did_recover) = init_bottomless_replicator(db_path.join("data"), options, &restore_option).await?; From c87d4db3c70855ce68bc8ef0ea6aa882e497ee7b Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 6 Jan 2025 12:03:14 -0500 Subject: [PATCH 3/4] sqld: release v0.24.31 --- Cargo.lock | 2 +- libsql-server/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da179084ef..601af664ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3208,7 +3208,7 @@ dependencies = [ [[package]] name = "libsql-server" -version = "0.24.30" +version = "0.24.31" dependencies = [ "aes", "anyhow", diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index ba1c7551b2..6a22910548 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libsql-server" -version = "0.24.30" +version = "0.24.31" edition = "2021" default-run = "sqld" repository = "https://github.com/tursodatabase/libsql" From e859a0df84f142b72e0c2d702653ededd742cb4b Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 6 Jan 2025 13:43:25 -0500 Subject: [PATCH 4/4] libsql: add `skip_safety_assert` builder option This adds a `skip_saftey_assert` builder option that will allow uses to unsafely opt-out of the safety checks that libsql has built in. The inline doc comments for this function explain more. --- libsql/src/database.rs | 12 ++++- libsql/src/database/builder.rs | 93 ++++++++++++++++++++++++++++------ libsql/src/local/database.rs | 87 ++++++++++++++++++++++++++++++- 3 files changed, 172 insertions(+), 20 deletions(-) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index ba9b3ad7cc..05eb67241a 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -74,6 +74,7 @@ enum DbType { path: String, flags: OpenFlags, encryption_config: Option, + skip_saftey_assert: bool, }, #[cfg(feature = "replication")] Sync { @@ -146,6 +147,7 @@ cfg_core! { path: db_path.into(), flags, encryption_config: None, + skip_saftey_assert: false, }, max_write_replication_index: Default::default(), }) @@ -429,7 +431,7 @@ cfg_replication! { DbType::Sync { db, .. } => { let path = db.path().to_string(); Ok(Database { - db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None}, + db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false }, max_write_replication_index: Default::default(), }) } @@ -550,10 +552,16 @@ impl Database { path, flags, encryption_config, + skip_saftey_assert, } => { use crate::local::impls::LibsqlConnection; - let db = crate::local::Database::open(path, *flags)?; + let db = if !skip_saftey_assert { + crate::local::Database::open(path, *flags)? + } else { + unsafe { crate::local::Database::open_raw(path, *flags)? } + }; + let conn = db.connect()?; if !cfg!(feature = "encryption") && encryption_config.is_some() { diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index f1c60ffb3c..9d9e0fdf8f 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -39,6 +39,7 @@ impl Builder<()> { path: path.as_ref().to_path_buf(), flags: crate::OpenFlags::default(), encryption_config: None, + skip_safety_assert: false, }, } } @@ -64,7 +65,8 @@ impl Builder<()> { read_your_writes: true, sync_interval: None, http_request_callback: None, - namespace: None + namespace: None, + skip_safety_assert: false, }, } } @@ -137,6 +139,7 @@ cfg_core! { path: std::path::PathBuf, flags: crate::OpenFlags, encryption_config: Option, + skip_safety_assert: bool, } impl Builder { @@ -155,10 +158,29 @@ cfg_core! { self } + /// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way + /// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED + /// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to + /// allow them to move between threads safely. Due to the fact that sqlite3 has a global + /// config this may conflict with other sqlite3 connections in the same process. + /// + /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence + /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at + /// your own risk. + pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + self.inner.skip_safety_assert = skip; + self + } + /// Build the local database. pub async fn build(self) -> Result { let db = if self.inner.path == std::path::Path::new(":memory:") { - let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?; + let db = if !self.inner.skip_safety_assert { + crate::local::Database::open(":memory:", crate::OpenFlags::default())? + } else { + unsafe { crate::local::Database::open_raw(":memory:", crate::OpenFlags::default())? } + }; + Database { db_type: DbType::Memory { db } , max_write_replication_index: Default::default(), @@ -176,6 +198,7 @@ cfg_core! { path, flags: self.inner.flags, encryption_config: self.inner.encryption_config, + skip_saftey_assert: self.inner.skip_safety_assert }, max_write_replication_index: Default::default(), } @@ -196,6 +219,7 @@ cfg_replication! { sync_interval: Option, http_request_callback: Option, namespace: Option, + skip_safety_assert: bool, } /// Local replica configuration type in [`Builder`]. @@ -270,6 +294,20 @@ cfg_replication! { self } + /// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way + /// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED + /// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to + /// allow them to move between threads safely. Due to the fact that sqlite3 has a global + /// config this may conflict with other sqlite3 connections in the same process. + /// + /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence + /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at + /// your own risk. + pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + self.inner.skip_safety_assert = skip; + self + } + /// Build the remote embedded replica database. pub async fn build(self) -> Result { let RemoteReplica { @@ -285,7 +323,8 @@ cfg_replication! { read_your_writes, sync_interval, http_request_callback, - namespace + namespace, + skip_safety_assert } = self.inner; let connector = if let Some(connector) = connector { @@ -303,19 +342,41 @@ cfg_replication! { let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned(); - let db = crate::local::Database::open_http_sync_internal( - connector, - path, - url, - auth_token, - version, - read_your_writes, - encryption_config.clone(), - sync_interval, - http_request_callback, - namespace, - ) - .await?; + let db = if !skip_safety_assert { + crate::local::Database::open_http_sync_internal( + connector, + path, + url, + auth_token, + version, + read_your_writes, + encryption_config.clone(), + sync_interval, + http_request_callback, + namespace, + ) + .await? + } else { + // SAFETY: this can only be enabled via the unsafe config function + // `skip_safety_assert`. + unsafe { + crate::local::Database::open_http_sync_internal2( + connector, + path, + url, + auth_token, + version, + read_your_writes, + encryption_config.clone(), + sync_interval, + http_request_callback, + namespace, + ) + .await? + } + + }; + Ok(Database { db_type: DbType::Sync { db, encryption_config }, diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 726bb98a4e..ce8bd7aa82 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -53,6 +53,30 @@ impl Database { } } + /// Safety: this is like `open` but does not enfoce that sqlite_config has THREADSAFE set to + /// `SQLITE_CONFIG_SERIALIZED`, calling + pub unsafe fn open_raw>(db_path: S, flags: OpenFlags) -> Result { + let db_path = db_path.into(); + + if db_path.starts_with("libsql:") + || db_path.starts_with("http:") + || db_path.starts_with("https:") + { + Err(ConnectionFailed(format!( + "Unable to open local database {db_path} with Database::open()" + ))) + } else { + Ok(Database { + db_path, + flags, + #[cfg(feature = "replication")] + replication_ctx: None, + #[cfg(feature = "sync")] + sync_ctx: None, + }) + } + } + #[cfg(feature = "replication")] pub async fn open_http_sync( connector: crate::util::ConnectorService, @@ -128,6 +152,57 @@ impl Database { Ok(db) } + #[cfg(feature = "replication")] + #[doc(hidden)] + pub async unsafe fn open_http_sync_internal2( + connector: crate::util::ConnectorService, + db_path: String, + endpoint: String, + auth_token: String, + version: Option, + read_your_writes: bool, + encryption_config: Option, + sync_interval: Option, + http_request_callback: Option, + namespace: Option, + ) -> Result { + use std::path::PathBuf; + + use crate::util::coerce_url_scheme; + + let mut db = Database::open_raw(&db_path, OpenFlags::default())?; + + let endpoint = coerce_url_scheme(endpoint); + let remote = crate::replication::client::Client::new( + connector.clone(), + endpoint + .as_str() + .try_into() + .map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?, + auth_token.clone(), + version.as_deref(), + http_request_callback.clone(), + namespace, + ) + .map_err(|e| crate::Error::Replication(e.into()))?; + let path = PathBuf::from(db_path); + let client = RemoteClient::new(remote.clone(), &path) + .await + .map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?; + + let replicator = + EmbeddedReplicator::with_remote(client, path, 1000, encryption_config, sync_interval) + .await?; + + db.replication_ctx = Some(ReplicationContext { + replicator, + client: Some(remote), + read_your_writes, + }); + + Ok(db) + } + #[cfg(feature = "sync")] #[doc(hidden)] pub async fn open_local_with_offline_writes( @@ -420,7 +495,11 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { + async fn try_push( + &self, + sync_ctx: &mut SyncContext, + conn: &Connection, + ) -> Result { let page_size = { let rows = conn .query("PRAGMA page_size", crate::params::Params::None)? @@ -471,7 +550,11 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { + async fn try_pull( + &self, + sync_ctx: &mut SyncContext, + conn: &Connection, + ) -> Result { let generation = sync_ctx.generation(); let mut frame_no = sync_ctx.durable_frame_num() + 1; conn.wal_insert_begin()?;