Skip to content

Commit

Permalink
Merge pull request #1730 from tursodatabase/fix-namespace-deletion
Browse files Browse the repository at this point in the history
fix namespace deletion
  • Loading branch information
MarinPostma authored Sep 13, 2024
2 parents 8e06705 + 11066a4 commit 9893b67
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 31 deletions.
3 changes: 3 additions & 0 deletions libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ pub enum Error {
AttachInMigration,
#[error("join failure: {0}")]
RuntimeTaskJoinError(#[from] tokio::task::JoinError),
#[error("wal error: {0}")]
LibsqlWal(#[from] libsql_wal::error::Error),
}

impl AsRef<Self> for Error {
Expand Down Expand Up @@ -218,6 +220,7 @@ impl IntoResponse for &Error {
HasLinkedDbs(_) => self.format_err(StatusCode::BAD_REQUEST),
AttachInMigration => self.format_err(StatusCode::BAD_REQUEST),
RuntimeTaskJoinError(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
LibsqlWal(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
}
Expand Down
54 changes: 53 additions & 1 deletion libsql-server/src/namespace/configurator/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use bytes::Bytes;
use enclose::enclose;
use futures::Stream;
use libsql_sys::EncryptionConfig;
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use tokio::io::AsyncBufReadExt as _;
use tokio::sync::watch;
use tokio::task::JoinSet;
Expand All @@ -29,7 +31,7 @@ use crate::namespace::{
};
use crate::replication::{FrameNo, ReplicationLogger};
use crate::stats::Stats;
use crate::{StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};
use crate::{SqldStorage, StatsSender, BLOCKING_RT, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};

use super::{BaseNamespaceConfig, PrimaryConfig};

Expand Down Expand Up @@ -464,3 +466,53 @@ pub(super) async fn cleanup_primary(

Ok(())
}

pub async fn cleanup_libsql(
namespace: &NamespaceName,
registry: &WalRegistry<StdIO, SqldStorage>,
base_path: &Path,
) -> crate::Result<()> {
let namespace = namespace.clone().into();
if let Some(shared) = registry.tombstone(&namespace).await {
// shutdown the registry, don't seal the current segment so that it's not
tokio::task::spawn_blocking({
let shared = shared.clone();
move || shared.shutdown()
})
.await
.unwrap()?;
}

let ns_db_path = base_path.join("dbs").join(namespace.as_str());
if ns_db_path.try_exists()? {
tracing::debug!("removing database directory: {}", ns_db_path.display());
let _ = tokio::fs::remove_dir_all(ns_db_path).await;
}

let ns_wals_path = base_path.join("wals").join(namespace.as_str());
if ns_wals_path.try_exists()? {
tracing::debug!("removing database directory: {}", ns_wals_path.display());
if let Err(e) = tokio::fs::remove_dir_all(ns_wals_path).await {
// what can go wrong?:
match e.kind() {
// alright, there's nothing to delete anyway
std::io::ErrorKind::NotFound => (),
_ => {
// something unexpected happened, this namespaces is in a bad state.
// The entry will not be removed from the registry to prevent another
// namespace with the same name to be reuse the same wal files. a
// manual intervention is necessary
// FIXME: on namespace creation, we could ensure that this directory is
// clean.
tracing::error!("error deleting `{namespace}` wal directory, manual intervention may be necessary: {e}");
return Err(e.into());
}
}
}
}

// when all is cleaned, leave place for next one
registry.remove(&namespace).await;

Ok(())
}
9 changes: 7 additions & 2 deletions libsql-server/src/namespace/configurator/libsql_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::schema::{has_pending_migration_task, setup_migration_table};
use crate::stats::Stats;
use crate::{SqldStorage, DB_CREATE_TIMEOUT, DEFAULT_AUTO_CHECKPOINT};

use super::helpers::cleanup_libsql;
use super::{BaseNamespaceConfig, ConfigureNamespace, PrimaryConfig};

pub struct LibsqlPrimaryConfigurator {
Expand Down Expand Up @@ -248,12 +249,16 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {

fn cleanup<'a>(
&'a self,
_namespace: &'a NamespaceName,
namespace: &'a NamespaceName,
_db_config: &'a DatabaseConfig,
_prune_all: bool,
_bottomless_db_id_init: NamespaceBottomlessDbIdInit,
) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
unimplemented!()
Box::pin(cleanup_libsql(
namespace,
&self.registry,
&self.base.base_path,
))
}

fn fork<'a>(
Expand Down
14 changes: 6 additions & 8 deletions libsql-server/src/namespace/configurator/libsql_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::namespace::{
};
use crate::{SqldStorage, DB_CREATE_TIMEOUT};

use super::helpers::cleanup_libsql;
use super::{BaseNamespaceConfig, ConfigureNamespace};

pub struct LibsqlReplicaConfigurator {
Expand Down Expand Up @@ -251,14 +252,11 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
_prune_all: bool,
_bottomless_db_id_init: NamespaceBottomlessDbIdInit,
) -> Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
Box::pin(async move {
let ns_path = self.base.base_path.join("dbs").join(namespace.as_str());
if ns_path.try_exists()? {
tracing::debug!("removing database directory: {}", ns_path.display());
tokio::fs::remove_dir_all(ns_path).await?;
}
Ok(())
})
Box::pin(cleanup_libsql(
namespace,
&self.registry,
&self.base.base_path,
))
}

fn fork<'a>(
Expand Down
24 changes: 9 additions & 15 deletions libsql-server/src/namespace/configurator/libsql_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::namespace::{
use crate::schema::SchedulerHandle;
use crate::SqldStorage;

use super::helpers::cleanup_primary;
use super::helpers::cleanup_libsql;
use super::libsql_primary::{libsql_primary_common, LibsqlPrimaryCommon};
use super::{BaseNamespaceConfig, ConfigureNamespace, PrimaryConfig};

Expand Down Expand Up @@ -146,21 +146,15 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
fn cleanup<'a>(
&'a self,
namespace: &'a NamespaceName,
db_config: &'a DatabaseConfig,
prune_all: bool,
bottomless_db_id_init: crate::namespace::NamespaceBottomlessDbIdInit,
_db_config: &'a DatabaseConfig,
_prune_all: bool,
_bottomless_db_id_init: crate::namespace::NamespaceBottomlessDbIdInit,
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<()>> + Send + 'a>> {
Box::pin(async move {
cleanup_primary(
&self.base,
&self.primary_config,
namespace,
db_config,
prune_all,
bottomless_db_id_init,
)
.await
})
Box::pin(cleanup_libsql(
namespace,
&self.registry,
&self.base.base_path,
))
}

fn fork<'a>(
Expand Down
8 changes: 3 additions & 5 deletions libsql-wal/src/checkpointer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ where
) -> impl Future<Output = crate::error::Result<()>> + Send {
let namespace = namespace.clone();
async move {
let registry = self
.get_async(&namespace)
.await
.expect("namespace not openned");
registry.checkpoint().await?;
if let Some(registry) = self.get_async(&namespace).await {
registry.checkpoint().await?;
}
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions libsql-wal/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {

#[error("storage error: {0}")]
Storage(#[from] Box<crate::storage::Error>),
#[error("wal is being deleted")]
DeletingWal,
}

impl Into<libsql_sys::ffi::Error> for Error {
Expand Down
41 changes: 41 additions & 0 deletions libsql-wal/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ enum Slot<IO: Io> {
/// entry in the registry map puts a building slot. Other connections will wait for the mutex
/// to turn to true, after the slot has been updated to contain the wal
Building(Arc<(Condvar, Mutex<bool>)>, Arc<Notify>),
/// The namespace was removed
Tombstone,
}

/// Wal Registry maintains a set of shared Wal, and their respective set of files.
Expand Down Expand Up @@ -85,6 +87,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
match self.opened.get(namespace).as_deref() {
Some(Slot::Wal(wal)) => return Some(wal.clone()),
Some(Slot::Building(_, notify)) => notify.clone(),
Some(Slot::Tombstone) => return None,
None => return None,
}
};
Expand Down Expand Up @@ -178,13 +181,15 @@ where
// the slot was updated: try again
continue;
}
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
}
}

let action = match self.opened.entry(namespace.clone()) {
dashmap::Entry::Occupied(e) => match e.get() {
Slot::Wal(shared) => return Ok(shared.clone()),
Slot::Building(wait, _) => Err(wait.clone()),
Slot::Tombstone => return Err(crate::error::Error::DeletingWal),
},
dashmap::Entry::Vacant(e) => {
let notifier = Arc::new((Condvar::new(), Mutex::new(false)));
Expand Down Expand Up @@ -370,6 +375,37 @@ where
}
}

pub async fn tombstone(&self, namespace: &NamespaceName) -> Option<Arc<SharedWal<IO>>> {
// if a wal is currently being openned, let it
{
let v = self.opened.get(namespace)?;
if let Slot::Building(_, ref notify) = *v {
notify.clone().notified().await;
}
}

match self.opened.insert(namespace.clone(), Slot::Tombstone) {
Some(Slot::Tombstone) => None,
Some(Slot::Building(_, _)) => {
unreachable!("already waited for ns to open")
}
Some(Slot::Wal(wal)) => Some(wal),
None => None,
}
}

pub async fn remove(&self, namespace: &NamespaceName) {
// if a wal is currently being openned, let it
{
let v = self.opened.get(namespace);
if let Some(Slot::Building(_, ref notify)) = v.as_deref() {
notify.clone().notified().await;
}
}

self.opened.remove(namespace);
}

/// Attempts to sync all loaded dbs with durable storage
pub async fn sync_all(&self, conccurency: usize) -> Result<()>
where
Expand Down Expand Up @@ -445,6 +481,7 @@ where
// wait for shared to finish building
notify.notified().await;
}
Slot::Tombstone => continue,
}
}
}
Expand Down Expand Up @@ -507,6 +544,10 @@ where

Ok(())
}

pub fn storage(&self) -> &S {
&self.storage
}
}

#[tracing::instrument(skip_all, fields(namespace = shared.namespace().as_str()))]
Expand Down

0 comments on commit 9893b67

Please sign in to comment.