diff --git a/Cargo.lock b/Cargo.lock index 976bed1b09..d0f4fbcf2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3208,7 +3208,7 @@ dependencies = [ [[package]] name = "libsql-server" -version = "0.24.30" +version = "0.24.31" dependencies = [ "aes", "anyhow", diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 180ca38576..6f318fd125 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -935,6 +935,14 @@ impl Replicator { })?; tracing::trace!("Local change counter: {change_counter}"); + // TODO: we shouldn't leak the connection here but for some reason when this connection get + // dropped it seems to checkpoint the database + if std::env::var("LIBSQL_BOTTOMLESS_DISABLE_INIT_CHECKPOINTING").is_ok() + || std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok() + { + std::mem::forget(conn); + } + Ok(change_counter.to_be_bytes()) } diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index f28bb9baa8..ca2de123ca 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -224,9 +224,9 @@ where } Err(Error::Client(e)) if !error_printed => { if e.downcast_ref::().is_some() { - tracing::error!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}"); + tracing::warn!("error connecting to primary. retrying. Verify that the libsql server version is `>=0.22` error: {e}"); } else { - tracing::error!("error connecting to primary. retrying. error: {e}"); + tracing::warn!("error connecting to primary. retrying. error: {e}"); } error_printed = true; diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index ba1c7551b2..6a22910548 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libsql-server" -version = "0.24.30" +version = "0.24.31" edition = "2021" default-run = "sqld" repository = "https://github.com/tursodatabase/libsql" diff --git a/libsql-server/src/namespace/configurator/helpers.rs b/libsql-server/src/namespace/configurator/helpers.rs index 081eeaef80..c84c963805 100644 --- a/libsql-server/src/namespace/configurator/helpers.rs +++ b/libsql-server/src/namespace/configurator/helpers.rs @@ -84,9 +84,16 @@ pub(super) async fn make_primary_connection_maker( let bottomless_replicator = match primary_config.bottomless_replication { Some(ref options) => { - tracing::debug!("Checkpointing before initializing bottomless"); - crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?; - tracing::debug!("Checkpointed before initializing bottomless"); + // TODO: figure out why we really need this the fixme above is not clear enough but + // disabling this allows us to prevent checkpointing of the wal file. + if !std::env::var("LIBSQL_DISABLE_INIT_CHECKPOINTING").is_ok() { + tracing::debug!("Checkpointing before initializing bottomless"); + crate::replication::primary::logger::checkpoint_db(&db_path.join("data"))?; + tracing::debug!("Checkpointed before initializing bottomless"); + } else { + tracing::warn!("Disabling initial checkpoint before bottomless"); + } + let options = make_bottomless_options(options, bottomless_db_id, name.clone()); let (replicator, did_recover) = init_bottomless_replicator(db_path.join("data"), options, &restore_option).await?; diff --git a/libsql/src/database.rs b/libsql/src/database.rs index ba9b3ad7cc..05eb67241a 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -74,6 +74,7 @@ enum DbType { path: String, flags: OpenFlags, encryption_config: Option, + skip_saftey_assert: bool, }, #[cfg(feature = "replication")] Sync { @@ -146,6 +147,7 @@ cfg_core! { path: db_path.into(), flags, encryption_config: None, + skip_saftey_assert: false, }, max_write_replication_index: Default::default(), }) @@ -429,7 +431,7 @@ cfg_replication! { DbType::Sync { db, .. } => { let path = db.path().to_string(); Ok(Database { - db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None}, + db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false }, max_write_replication_index: Default::default(), }) } @@ -550,10 +552,16 @@ impl Database { path, flags, encryption_config, + skip_saftey_assert, } => { use crate::local::impls::LibsqlConnection; - let db = crate::local::Database::open(path, *flags)?; + let db = if !skip_saftey_assert { + crate::local::Database::open(path, *flags)? + } else { + unsafe { crate::local::Database::open_raw(path, *flags)? } + }; + let conn = db.connect()?; if !cfg!(feature = "encryption") && encryption_config.is_some() { diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index f1c60ffb3c..9d9e0fdf8f 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -39,6 +39,7 @@ impl Builder<()> { path: path.as_ref().to_path_buf(), flags: crate::OpenFlags::default(), encryption_config: None, + skip_safety_assert: false, }, } } @@ -64,7 +65,8 @@ impl Builder<()> { read_your_writes: true, sync_interval: None, http_request_callback: None, - namespace: None + namespace: None, + skip_safety_assert: false, }, } } @@ -137,6 +139,7 @@ cfg_core! { path: std::path::PathBuf, flags: crate::OpenFlags, encryption_config: Option, + skip_safety_assert: bool, } impl Builder { @@ -155,10 +158,29 @@ cfg_core! { self } + /// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way + /// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED + /// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to + /// allow them to move between threads safely. Due to the fact that sqlite3 has a global + /// config this may conflict with other sqlite3 connections in the same process. + /// + /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence + /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at + /// your own risk. + pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + self.inner.skip_safety_assert = skip; + self + } + /// Build the local database. pub async fn build(self) -> Result { let db = if self.inner.path == std::path::Path::new(":memory:") { - let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?; + let db = if !self.inner.skip_safety_assert { + crate::local::Database::open(":memory:", crate::OpenFlags::default())? + } else { + unsafe { crate::local::Database::open_raw(":memory:", crate::OpenFlags::default())? } + }; + Database { db_type: DbType::Memory { db } , max_write_replication_index: Default::default(), @@ -176,6 +198,7 @@ cfg_core! { path, flags: self.inner.flags, encryption_config: self.inner.encryption_config, + skip_saftey_assert: self.inner.skip_safety_assert }, max_write_replication_index: Default::default(), } @@ -196,6 +219,7 @@ cfg_replication! { sync_interval: Option, http_request_callback: Option, namespace: Option, + skip_safety_assert: bool, } /// Local replica configuration type in [`Builder`]. @@ -270,6 +294,20 @@ cfg_replication! { self } + /// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way + /// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED + /// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to + /// allow them to move between threads safely. Due to the fact that sqlite3 has a global + /// config this may conflict with other sqlite3 connections in the same process. + /// + /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence + /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at + /// your own risk. + pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + self.inner.skip_safety_assert = skip; + self + } + /// Build the remote embedded replica database. pub async fn build(self) -> Result { let RemoteReplica { @@ -285,7 +323,8 @@ cfg_replication! { read_your_writes, sync_interval, http_request_callback, - namespace + namespace, + skip_safety_assert } = self.inner; let connector = if let Some(connector) = connector { @@ -303,19 +342,41 @@ cfg_replication! { let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned(); - let db = crate::local::Database::open_http_sync_internal( - connector, - path, - url, - auth_token, - version, - read_your_writes, - encryption_config.clone(), - sync_interval, - http_request_callback, - namespace, - ) - .await?; + let db = if !skip_safety_assert { + crate::local::Database::open_http_sync_internal( + connector, + path, + url, + auth_token, + version, + read_your_writes, + encryption_config.clone(), + sync_interval, + http_request_callback, + namespace, + ) + .await? + } else { + // SAFETY: this can only be enabled via the unsafe config function + // `skip_safety_assert`. + unsafe { + crate::local::Database::open_http_sync_internal2( + connector, + path, + url, + auth_token, + version, + read_your_writes, + encryption_config.clone(), + sync_interval, + http_request_callback, + namespace, + ) + .await? + } + + }; + Ok(Database { db_type: DbType::Sync { db, encryption_config }, diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 726bb98a4e..ce8bd7aa82 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -53,6 +53,30 @@ impl Database { } } + /// Safety: this is like `open` but does not enfoce that sqlite_config has THREADSAFE set to + /// `SQLITE_CONFIG_SERIALIZED`, calling + pub unsafe fn open_raw>(db_path: S, flags: OpenFlags) -> Result { + let db_path = db_path.into(); + + if db_path.starts_with("libsql:") + || db_path.starts_with("http:") + || db_path.starts_with("https:") + { + Err(ConnectionFailed(format!( + "Unable to open local database {db_path} with Database::open()" + ))) + } else { + Ok(Database { + db_path, + flags, + #[cfg(feature = "replication")] + replication_ctx: None, + #[cfg(feature = "sync")] + sync_ctx: None, + }) + } + } + #[cfg(feature = "replication")] pub async fn open_http_sync( connector: crate::util::ConnectorService, @@ -128,6 +152,57 @@ impl Database { Ok(db) } + #[cfg(feature = "replication")] + #[doc(hidden)] + pub async unsafe fn open_http_sync_internal2( + connector: crate::util::ConnectorService, + db_path: String, + endpoint: String, + auth_token: String, + version: Option, + read_your_writes: bool, + encryption_config: Option, + sync_interval: Option, + http_request_callback: Option, + namespace: Option, + ) -> Result { + use std::path::PathBuf; + + use crate::util::coerce_url_scheme; + + let mut db = Database::open_raw(&db_path, OpenFlags::default())?; + + let endpoint = coerce_url_scheme(endpoint); + let remote = crate::replication::client::Client::new( + connector.clone(), + endpoint + .as_str() + .try_into() + .map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?, + auth_token.clone(), + version.as_deref(), + http_request_callback.clone(), + namespace, + ) + .map_err(|e| crate::Error::Replication(e.into()))?; + let path = PathBuf::from(db_path); + let client = RemoteClient::new(remote.clone(), &path) + .await + .map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?; + + let replicator = + EmbeddedReplicator::with_remote(client, path, 1000, encryption_config, sync_interval) + .await?; + + db.replication_ctx = Some(ReplicationContext { + replicator, + client: Some(remote), + read_your_writes, + }); + + Ok(db) + } + #[cfg(feature = "sync")] #[doc(hidden)] pub async fn open_local_with_offline_writes( @@ -420,7 +495,11 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { + async fn try_push( + &self, + sync_ctx: &mut SyncContext, + conn: &Connection, + ) -> Result { let page_size = { let rows = conn .query("PRAGMA page_size", crate::params::Params::None)? @@ -471,7 +550,11 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { + async fn try_pull( + &self, + sync_ctx: &mut SyncContext, + conn: &Connection, + ) -> Result { let generation = sync_ctx.generation(); let mut frame_no = sync_ctx.durable_frame_num() + 1; conn.wal_insert_begin()?;