Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsql: add skip_safety_assert builder option #1906

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ enum DbType {
path: String,
flags: OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_saftey_assert: bool,
Copy link

@Laerte Laerte Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo? Title says skip_safety_assert but variable is defined as skip_saftey_assert.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is def a typo, but its a private type so we can fix it without worry.

},
#[cfg(feature = "replication")]
Sync {
Expand Down Expand Up @@ -146,6 +147,7 @@ cfg_core! {
path: db_path.into(),
flags,
encryption_config: None,
skip_saftey_assert: false,
},
max_write_replication_index: Default::default(),
})
Expand Down Expand Up @@ -429,7 +431,7 @@ cfg_replication! {
DbType::Sync { db, .. } => {
let path = db.path().to_string();
Ok(Database {
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None},
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false },
max_write_replication_index: Default::default(),
})
}
Expand Down Expand Up @@ -550,10 +552,16 @@ impl Database {
path,
flags,
encryption_config,
skip_saftey_assert,
} => {
use crate::local::impls::LibsqlConnection;

let db = crate::local::Database::open(path, *flags)?;
let db = if !skip_saftey_assert {
crate::local::Database::open(path, *flags)?
} else {
unsafe { crate::local::Database::open_raw(path, *flags)? }
};

let conn = db.connect()?;

if !cfg!(feature = "encryption") && encryption_config.is_some() {
Expand Down
93 changes: 77 additions & 16 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ impl Builder<()> {
path: path.as_ref().to_path_buf(),
flags: crate::OpenFlags::default(),
encryption_config: None,
skip_safety_assert: false,
},
}
}
Expand All @@ -64,7 +65,8 @@ impl Builder<()> {
read_your_writes: true,
sync_interval: None,
http_request_callback: None,
namespace: None
namespace: None,
skip_safety_assert: false,
},
}
}
Expand Down Expand Up @@ -137,6 +139,7 @@ cfg_core! {
path: std::path::PathBuf,
flags: crate::OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_safety_assert: bool,
}

impl Builder<Local> {
Expand All @@ -155,10 +158,29 @@ cfg_core! {
self
}

/// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way
/// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED
/// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to
/// allow them to move between threads safely. Due to the fact that sqlite3 has a global
/// config this may conflict with other sqlite3 connections in the same process.
///
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<Local> {
self.inner.skip_safety_assert = skip;
self
}

/// Build the local database.
pub async fn build(self) -> Result<Database> {
let db = if self.inner.path == std::path::Path::new(":memory:") {
let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?;
let db = if !self.inner.skip_safety_assert {
crate::local::Database::open(":memory:", crate::OpenFlags::default())?
} else {
unsafe { crate::local::Database::open_raw(":memory:", crate::OpenFlags::default())? }
};

Database {
db_type: DbType::Memory { db } ,
max_write_replication_index: Default::default(),
Expand All @@ -176,6 +198,7 @@ cfg_core! {
path,
flags: self.inner.flags,
encryption_config: self.inner.encryption_config,
skip_saftey_assert: self.inner.skip_safety_assert
},
max_write_replication_index: Default::default(),
}
Expand All @@ -196,6 +219,7 @@ cfg_replication! {
sync_interval: Option<std::time::Duration>,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
skip_safety_assert: bool,
}

/// Local replica configuration type in [`Builder`].
Expand Down Expand Up @@ -270,6 +294,20 @@ cfg_replication! {
self
}

/// Skip the saftey assert used to ensure that sqlite3 is configured correctly for the way
/// that libsql uses the ffi code. By default, libsql will try to use the SERIALIZED
/// threadsafe mode for sqlite3. This allows us to implement Send/Sync for all the types to
/// allow them to move between threads safely. Due to the fact that sqlite3 has a global
/// config this may conflict with other sqlite3 connections in the same process.
///
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<RemoteReplica> {
self.inner.skip_safety_assert = skip;
self
}

/// Build the remote embedded replica database.
pub async fn build(self) -> Result<Database> {
let RemoteReplica {
Expand All @@ -285,7 +323,8 @@ cfg_replication! {
read_your_writes,
sync_interval,
http_request_callback,
namespace
namespace,
skip_safety_assert
} = self.inner;

let connector = if let Some(connector) = connector {
Expand All @@ -303,19 +342,41 @@ cfg_replication! {

let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();

let db = crate::local::Database::open_http_sync_internal(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?;
let db = if !skip_safety_assert {
crate::local::Database::open_http_sync_internal(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?
} else {
// SAFETY: this can only be enabled via the unsafe config function
// `skip_safety_assert`.
unsafe {
crate::local::Database::open_http_sync_internal2(
connector,
path,
url,
auth_token,
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
http_request_callback,
namespace,
)
.await?
}

};


Ok(Database {
db_type: DbType::Sync { db, encryption_config },
Expand Down
87 changes: 85 additions & 2 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ impl Database {
}
}

/// Safety: this is like `open` but does not enfoce that sqlite_config has THREADSAFE set to
/// `SQLITE_CONFIG_SERIALIZED`, calling
pub unsafe fn open_raw<S: Into<String>>(db_path: S, flags: OpenFlags) -> Result<Database> {
let db_path = db_path.into();

if db_path.starts_with("libsql:")
|| db_path.starts_with("http:")
|| db_path.starts_with("https:")
{
Err(ConnectionFailed(format!(
"Unable to open local database {db_path} with Database::open()"
)))
} else {
Ok(Database {
db_path,
flags,
#[cfg(feature = "replication")]
replication_ctx: None,
#[cfg(feature = "sync")]
sync_ctx: None,
})
}
}

#[cfg(feature = "replication")]
pub async fn open_http_sync(
connector: crate::util::ConnectorService,
Expand Down Expand Up @@ -128,6 +152,57 @@ impl Database {
Ok(db)
}

#[cfg(feature = "replication")]
#[doc(hidden)]
pub async unsafe fn open_http_sync_internal2(
connector: crate::util::ConnectorService,
db_path: String,
endpoint: String,
auth_token: String,
version: Option<String>,
read_your_writes: bool,
encryption_config: Option<EncryptionConfig>,
sync_interval: Option<std::time::Duration>,
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
) -> Result<Database> {
use std::path::PathBuf;

use crate::util::coerce_url_scheme;

let mut db = Database::open_raw(&db_path, OpenFlags::default())?;

let endpoint = coerce_url_scheme(endpoint);
let remote = crate::replication::client::Client::new(
connector.clone(),
endpoint
.as_str()
.try_into()
.map_err(|e: InvalidUri| crate::Error::Replication(e.into()))?,
auth_token.clone(),
version.as_deref(),
http_request_callback.clone(),
namespace,
)
.map_err(|e| crate::Error::Replication(e.into()))?;
let path = PathBuf::from(db_path);
let client = RemoteClient::new(remote.clone(), &path)
.await
.map_err(|e| crate::errors::Error::ConnectionFailed(e.to_string()))?;

let replicator =
EmbeddedReplicator::with_remote(client, path, 1000, encryption_config, sync_interval)
.await?;

db.replication_ctx = Some(ReplicationContext {
replicator,
client: Some(remote),
read_your_writes,
});

Ok(db)
}

#[cfg(feature = "sync")]
#[doc(hidden)]
pub async fn open_local_with_offline_writes(
Expand Down Expand Up @@ -420,7 +495,11 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
async fn try_push(
&self,
sync_ctx: &mut SyncContext,
conn: &Connection,
) -> Result<crate::database::Replicated> {
let page_size = {
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
Expand Down Expand Up @@ -471,7 +550,11 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
async fn try_pull(
&self,
sync_ctx: &mut SyncContext,
conn: &Connection,
) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
conn.wal_insert_begin()?;
Expand Down
Loading