Skip to content

Commit

Permalink
Merge pull request #1644 from tursodatabase/query-cancel
Browse files Browse the repository at this point in the history
cancel query when request is dropped
  • Loading branch information
MarinPostma authored Aug 7, 2024
2 parents 0511ec3 + 9595315 commit 8db5ea8
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 3 deletions.
55 changes: 52 additions & 3 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use tokio::sync::watch;
use tokio::time::{Duration, Instant};

use crate::error::Error;
use crate::metrics::{DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, VACUUM_COUNT, WAL_CHECKPOINT_COUNT};
use crate::metrics::{
DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, QUERY_CANCELED, VACUUM_COUNT, WAL_CHECKPOINT_COUNT,
};
use crate::namespace::broadcasters::BroadcasterHandle;
use crate::namespace::meta_store::MetaStoreHandle;
use crate::namespace::ResolveNamespacePathFn;
Expand Down Expand Up @@ -391,14 +393,44 @@ where
ctx: RequestContext,
builder: B,
) -> Result<(B, Program)> {
struct Bomb {
canceled: Arc<AtomicBool>,
defused: bool,
}

impl Drop for Bomb {
fn drop(&mut self) {
if !self.defused {
tracing::trace!("cancelling request");
self.canceled.store(true, Ordering::Relaxed);
}
}
}

let canceled = {
let cancelled = self.inner.lock().canceled.clone();
cancelled.store(false, Ordering::Relaxed);
cancelled
};

PROGRAM_EXEC_COUNT.increment(1);

check_program_auth(&ctx, &pgm, &self.inner.lock().config_store.get())?;

// create the bomb right before spawning the blocking task.
let mut bomb = Bomb {
canceled,
defused: false,
};
let conn = self.inner.clone();
BLOCKING_RT
let ret = BLOCKING_RT
.spawn_blocking(move || Connection::run(conn, pgm, builder))
.await
.unwrap()
.unwrap();

bomb.defused = true;

ret
}
}

Expand All @@ -413,6 +445,7 @@ pub(super) struct Connection<W> {
forced_rollback: bool,
broadcaster: BroadcasterHandle,
hooked: bool,
canceled: Arc<AtomicBool>,
}

fn update_stats(
Expand Down Expand Up @@ -475,6 +508,20 @@ impl<W: Wal> Connection<W> {
);
}

let canceled = Arc::new(AtomicBool::new(false));

conn.progress_handler(100, {
let canceled = canceled.clone();
Some(move || {
let canceled = canceled.load(Ordering::Relaxed);
if canceled {
QUERY_CANCELED.increment(1);
tracing::trace!("request canceled");
}
canceled
})
});

let this = Self {
conn,
stats,
Expand All @@ -486,6 +533,7 @@ impl<W: Wal> Connection<W> {
forced_rollback: false,
broadcaster,
hooked: false,
canceled,
};

for ext in extensions.iter() {
Expand Down Expand Up @@ -795,6 +843,7 @@ mod test {
forced_rollback: false,
broadcaster: Default::default(),
hooked: false,
canceled: Arc::new(false.into()),
};

let conn = Arc::new(Mutex::new(conn));
Expand Down
5 changes: 5 additions & 0 deletions libsql-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,8 @@ pub static LISTEN_EVENTS_DROPPED: Lazy<Counter> = Lazy::new(|| {
describe_counter!(NAME, "Number of listen events dropped");
register_counter!(NAME)
});
pub static QUERY_CANCELED: Lazy<Counter> = Lazy::new(|| {
const NAME: &str = "libsql_server_query_canceled";
describe_counter!(NAME, "Number of canceled queries");
register_counter!(NAME)
});

0 comments on commit 8db5ea8

Please sign in to comment.