Skip to content

Commit

Permalink
introduce deadlock monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 2, 2024
1 parent 1d7fb16 commit daca1c7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
58 changes: 58 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub struct Server<C = HttpConnector, A = AddrIncoming, D = HttpsConnector<HttpCo
pub storage_server_address: String,
pub connector: Option<D>,
pub migrate_bottomless: bool,
pub enable_deadlock_monitor: bool,
}

impl<C, A, D> Default for Server<C, A, D> {
Expand All @@ -201,6 +202,7 @@ impl<C, A, D> Default for Server<C, A, D> {
storage_server_address: Default::default(),
connector: None,
migrate_bottomless: false,
enable_deadlock_monitor: false,
}
}
}
Expand Down Expand Up @@ -410,6 +412,57 @@ fn init_version_file(db_path: &Path) -> anyhow::Result<()> {
Ok(())
}

/// The deadlock watcher monitors the main tokio runtime for deadlock by sending Ping to a task
/// within it, and waiting for pongs. If the runtime fails to respond in due time, the watcher
/// exits the process.
fn install_deadlock_monitor() {
// this is a very generous deadline for the main runtime to respond
const PONG_DEADLINE: Duration = Duration::from_secs(5);

struct Ping;
struct Pong;

let (sender, mut receiver) = tokio::sync::mpsc::channel(1);

std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async move {
loop {
let (snd, ret) = tokio::sync::oneshot::channel();
sender.try_send((snd, Ping)).unwrap();
match tokio::time::timeout(PONG_DEADLINE, ret).await {
Ok(Ok(Pong)) => (),
Err(_) => {
tracing::error!(
"main runtime failed to respond within deadlines, deadlock detected"
);
// std::process::exit(1);
}
_ => (),
}

tokio::time::sleep(Duration::from_secs(1)).await;
}
})
});

tokio::spawn(async move {
loop {
match receiver.recv().await {
Some((ret, Ping)) => {
let _ = ret.send(Pong);
}
None => break,
}
}

tracing::warn!("deadlock monitor exited")
});
}

impl<C, A, D> Server<C, A, D>
where
C: Connector,
Expand Down Expand Up @@ -501,6 +554,11 @@ where
static INIT: std::sync::Once = std::sync::Once::new();
let mut task_manager = TaskManager::new();

if self.enable_deadlock_monitor {
install_deadlock_monitor();
tracing::info!("deadlock monitor installed");
}

if std::env::var("LIBSQL_SQLITE_MIMALLOC").is_ok() {
setup_sqlite_alloc();
}
Expand Down
5 changes: 5 additions & 0 deletions libsql-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ struct Cli {
requires = "enable_bottomless_replication"
)]
migrate_bottomless: bool,

/// Enables the main runtime deadlock monitor: if the main runtime deadlocks, logs an error
#[clap(long)]
enable_deadlock_monitor: bool,
}

#[derive(clap::Subcommand, Debug)]
Expand Down Expand Up @@ -671,6 +675,7 @@ async fn build_server(config: &Cli) -> anyhow::Result<Server> {
storage_server_address: config.storage_server_address.clone(),
connector: Some(https),
migrate_bottomless: config.migrate_bottomless,
enable_deadlock_monitor: config.enable_deadlock_monitor,
})
}

Expand Down

0 comments on commit daca1c7

Please sign in to comment.