Skip to content

Commit

Permalink
disable foreign keys in schema migration
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 2, 2024
1 parent 1d7fb16 commit 1e809f2
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 77 deletions.
8 changes: 7 additions & 1 deletion libsql-server/src/database/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ impl<C: crate::connection::Connection> crate::connection::Connection for SchemaC
} else {
check_program_auth(&ctx, &migration, &self.config.get()).await?;
let connection = self.connection.clone();
validate_migration(&mut migration)?;
let disable_foreign_key = validate_migration(&mut migration)?;
let migration = Arc::new(migration);
let builder = tokio::task::spawn_blocking({
let migration = migration.clone();
move || {
let res = connection.with_raw(|conn| -> crate::Result<_> {
if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=off", ())?;
}
let mut txn = conn
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
.map_err(|_| {
Expand All @@ -73,6 +76,9 @@ impl<C: crate::connection::Connection> crate::connection::Connection for SchemaC
&QueryBuilderConfig::default(),
);
txn.rollback().unwrap();
if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=on", ())?;
}
Ok(ret?)
});

Expand Down
8 changes: 8 additions & 0 deletions libsql-server/src/query_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ impl Statement {
StmtKind::Read | StmtKind::TxnEnd | StmtKind::TxnBegin
)
}

pub(crate) fn is_pragma(&self) -> bool {
// adding a flag to the program would break the serialization, so we do that instead
match self.stmt.split_whitespace().next() {
Some(s) => s.trim().eq_ignore_ascii_case("pragma"),
None => false,
}
}
}

/// Given a an initial state and an array of queries, attempts to predict what the final state will
Expand Down
7 changes: 5 additions & 2 deletions libsql-server/src/schema/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::namespace::NamespaceName;
use crate::schema::status::{MigrationJobProgress, MigrationJobSummary};

use super::status::MigrationProgress;
use super::validate_migration;
use super::{
status::{MigrationJob, MigrationTask},
Error, MigrationDetails, MigrationJobStatus, MigrationSummary, MigrationTaskStatus,
Expand Down Expand Up @@ -328,15 +329,17 @@ pub(super) fn get_next_pending_migration_job(
|row| {
let job_id = row.get::<_, i64>(0)?;
let status = MigrationJobStatus::from_int(row.get::<_, u64>(1)?);
let migration = serde_json::from_str(row.get_ref(2)?.as_str()?).unwrap();
let mut migration = serde_json::from_str(row.get_ref(2)?.as_str()?).unwrap();
let schema = NamespaceName::from_string(row.get::<_, String>(3)?).unwrap();
let disable_foreign_key = validate_migration(&mut migration).unwrap();
Ok(MigrationJob {
schema,
job_id,
status,
migration,
progress: Default::default(),
task_error: None,
disable_foreign_key,
migration: migration.into(),
})
},
)
Expand Down
64 changes: 37 additions & 27 deletions libsql-server/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,47 @@ pub use scheduler::Scheduler;
pub use status::{MigrationDetails, MigrationJobStatus, MigrationSummary, MigrationTaskStatus};

use crate::connection::program::Program;
use crate::query::{Params, Query};
use crate::query_analysis::{Statement, StmtKind};
use crate::query_analysis::StmtKind;

pub fn validate_migration(migration: &mut Program) -> Result<(), Error> {
if !migration.steps.is_empty()
&& matches!(migration.steps[0].query.stmt.kind, StmtKind::TxnBegin)
{
if !matches!(
migration.steps.last().map(|s| &s.query.stmt.kind),
Some(&StmtKind::TxnEnd)
) {
return Err(Error::MigrationContainsTransactionStatements);
// validate program is valid for migration, and return whether foreign keys should be disabled
pub fn validate_migration(migration: &mut Program) -> Result<bool, Error> {
let mut steps = migration.steps_mut().unwrap().iter_mut().peekable();
let mut explicit_tx = false;
let mut disable_foreign_key = false;
// skip pragmas prologue
while steps.next_if(|s| s.query.stmt.is_pragma()).is_some() {
disable_foreign_key = true;
}

// first step can be a BEGIN
if let Some(step) = steps.next() {
if matches!(step.query.stmt.kind, StmtKind::TxnBegin) {
// neutralize step
step.query.stmt.stmt = r#"SELECT 'neutralized txn begin'"#.into();
explicit_tx = true;
}
migration.steps_mut().unwrap()[0].query = Query {
stmt: Statement::parse("PRAGMA max_page_count")
.next()
.unwrap()
.unwrap(),
params: Params::empty(),
want_rows: false,
};
while let Some(step) = migration.steps.last() {
if !matches!(step.query.stmt.kind, StmtKind::TxnEnd) {
break;
}

// skip all steps that are not tx items
while steps.next_if(|s| !s.query.stmt.kind.is_txn()).is_some() {}

// last stmt can be a tx commit
while let Some(step) = steps.next_if(|s| s.query.stmt.kind.is_txn()) {
if matches!(step.query.stmt.kind, StmtKind::TxnEnd) {
if !explicit_tx {
// transaction is closed but was never opened
return Err(Error::MigrationContainsTransactionStatements);
}
migration.steps_mut().unwrap().pop();
// neutralize step
step.query.stmt.stmt = r#"SELECT 'neutralized txn component'"#.into();
}
}
if migration.steps().iter().any(|s| s.query.stmt.kind.is_txn()) {
Err(Error::MigrationContainsTransactionStatements)
} else {
Ok(())

// validate pragma epilogue
if steps.by_ref().any(|s| !s.query.stmt.is_pragma()) {
// only accept pragmas after tx end
return Err(Error::MigrationContainsTransactionStatements);
}

Ok(disable_foreign_key)
}
19 changes: 19 additions & 0 deletions libsql-server/src/schema/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl Scheduler {
job.job_id(),
self.namespace_store.clone(),
self.migration_db.clone(),
job.disable_foreign_key,
));
// do not enqueue anything until the schema migration is complete
self.has_work = false;
Expand Down Expand Up @@ -374,6 +375,7 @@ impl Scheduler {
job.migration.clone(),
task,
block_writes,
job.disable_foreign_key,
));
} else {
// there is still a job, but the queue is empty, it means that we are waiting for the
Expand Down Expand Up @@ -434,6 +436,7 @@ async fn try_step_task(
migration: Arc<Program>,
mut task: MigrationTask,
block_writes: Arc<AtomicBool>,
disable_foreign_key: bool,
) -> WorkResult {
let old_status = *task.status();
let error = match try_step_task_inner(
Expand All @@ -443,6 +446,7 @@ async fn try_step_task(
migration,
&task,
block_writes,
disable_foreign_key,
)
.await
{
Expand Down Expand Up @@ -485,6 +489,7 @@ async fn try_step_task_inner(
migration: Arc<Program>,
task: &MigrationTask,
block_writes: Arc<AtomicBool>,
disable_foreign_key: bool,
) -> Result<(MigrationTaskStatus, Option<String>), Error> {
let status = *task.status();
let mut db_connection = connection_maker
Expand All @@ -508,6 +513,9 @@ async fn try_step_task_inner(
let job_id = task.job_id();
let (status, error) = tokio::task::spawn_blocking(move || -> Result<_, Error> {
db_connection.with_raw(move |conn| {
if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=off", ())?;
}
let mut txn = conn.transaction()?;

match status {
Expand All @@ -526,6 +534,10 @@ async fn try_step_task_inner(
let (new_status, error) = step_task(&mut txn, job_id)?;
txn.commit()?;

if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=off", ())?;
}

if new_status.is_finished() {
block_writes.store(false, std::sync::atomic::Ordering::SeqCst);
}
Expand Down Expand Up @@ -737,6 +749,7 @@ async fn step_job_run_success(
job_id: i64,
namespace_store: NamespaceStore,
migration_db: Arc<Mutex<MetaStoreConnection>>,
disable_foreign_key: bool,
) -> WorkResult {
try_step_job(MigrationJobStatus::WaitingRun, async move {
// TODO: check that all tasks actually reported success before migration
Expand All @@ -757,6 +770,9 @@ async fn step_job_run_success(
.map_err(|e| Error::FailedToConnect(schema.clone(), e.into()))?;
tokio::task::spawn_blocking(move || -> Result<(), Error> {
connection.with_raw(|conn| -> Result<(), Error> {
if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=off", ())?;
}
let mut txn = conn.transaction()?;
let schema_version =
txn.query_row("PRAGMA schema_version", (), |row| row.get::<_, i64>(0))?;
Expand All @@ -774,6 +790,9 @@ async fn step_job_run_success(
txn.pragma_update(None, "schema_version", job_id)?;
// update schema version to job_id?
txn.commit()?;
if disable_foreign_key {
conn.execute("PRAGMA foreign_keys=on", ())?;
}
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ MigrationJob {
0,
],
task_error: None,
disable_foreign_key: false,
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ MigrationJob {
0,
],
task_error: None,
disable_foreign_key: false,
}
1 change: 1 addition & 0 deletions libsql-server/src/schema/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct MigrationJob {
pub(super) progress: MigrationProgress,
/// error info for the task that failed the job
pub(super) task_error: Option<(i64, String, NamespaceName)>,
pub(super) disable_foreign_key: bool,
}

impl MigrationJob {
Expand Down
47 changes: 0 additions & 47 deletions libsql-server/tests/namespaces/shared_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,53 +216,6 @@ fn no_job_created_when_migration_job_is_invalid() {
sim.run().unwrap();
}

#[test]
fn migration_contains_txn_statements() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(100000))
.build();
let tmp = tempdir().unwrap();
make_primary(&mut sim, tmp.path().to_path_buf());

sim.client("client", async {
let client = Client::new();
client
.post(
"http://primary:9090/v1/namespaces/schema/create",
json!({"shared_schema": true }),
)
.await
.unwrap();

let schema_db = Database::open_remote_with_connector(
"http://schema.primary:8080",
String::new(),
TurmoilConnector,
)
.unwrap();
let schema_conn = schema_db.connect().unwrap();
schema_conn
.execute_batch("begin; create table test1 (c);commit")
.await
.unwrap();
assert_debug_snapshot!(schema_conn
.execute_batch("begin; create table test (c)")
.await
.unwrap_err());

let resp = client
.get("http://schema.primary:8080/v1/jobs/2")
.await
.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_debug_snapshot!(resp.json_value().await.unwrap());

Ok(())
});

sim.run().unwrap();
}

#[test]
fn dry_run_failure() {
let mut sim = Builder::new()
Expand Down

0 comments on commit 1e809f2

Please sign in to comment.