Skip to content

Commit

Permalink
Merge pull request #1750 from tursodatabase/lucio/clean-up-schema-rep…
Browse files Browse the repository at this point in the history
…lica

sqld: schema db and replica fixes
  • Loading branch information
LucioFranco authored Sep 19, 2024
2 parents 7ab5dc0 + 1f0c9df commit 0bc1533
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 2 deletions.
4 changes: 2 additions & 2 deletions libsql-server/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ impl Proxy for ProxyService {
Ok(conn) => {
if !conn.is_primary() {
return Err(tonic::Status::failed_precondition(
"cannot run schema migration against a replica",
"cannot run schema migration against a replica from a replica",
));
}

Expand Down Expand Up @@ -710,7 +710,7 @@ impl Proxy for ProxyService {
Ok(conn) => {
if !conn.is_primary() {
return Err(tonic::Status::failed_precondition(
"cannot run schema migration against a replica",
"cannot run schema migration against a replica from a replica",
));
}

Expand Down
8 changes: 8 additions & 0 deletions libsql-server/src/rpc/replication/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@ impl ReplicationLog for ReplicationLogService {
let (logger, config, version, _, _) =
self.logger_from_namespace(namespace, &req, false).await?;

// If we are a shared schema and serving externally (aka to embedded replica's) then
// return an error.
if config.is_shared_schema && !self.service_internal {
return Err(tonic::Status::failed_precondition(
"cannot replicate a shared schema db",
));
}

let session_hash = self.encode_session_token(version);

let response = HelloResponse {
Expand Down
29 changes: 29 additions & 0 deletions libsql-server/tests/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,32 @@ fn large_proxy_query() {

sim.run().unwrap();
}

#[test]
fn replicate_from_shared_schema() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(10000))
.tcp_capacity(100000)
.build();
make_cluster(&mut sim, 1, true);

sim.client("client", async {
let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();

conn.execute("create table test (x)", ()).await.unwrap();

let db = Database::open_remote_with_connector("http://replica0:8080", "", TurmoilConnector)
.unwrap();
let conn = db.connect().unwrap();

conn.execute_batch("select * from sqlite_master;")
.await
.unwrap();

Ok(())
});

sim.run().unwrap();
}
61 changes: 61 additions & 0 deletions libsql-server/tests/cluster/schema_dbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,64 @@ fn schema_migration_basics() {

sim.run().unwrap();
}

#[test]
fn schema_migration_via_replica() {
let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
make_cluster(&mut sim, 1, true);

sim.client("client", async {
let http = Client::new();

assert!(http
.post(
"http://primary:9090/v1/namespaces/schema/create",
json!({ "shared_schema": true })
)
.await
.unwrap()
.status()
.is_success());
assert!(http
.post(
"http://primary:9090/v1/namespaces/foo/create",
json!({ "shared_schema_name": "schema" })
)
.await
.unwrap()
.status()
.is_success());

{
let db = Database::open_remote_with_connector(
"http://schema.primary:8080",
"",
TurmoilConnector,
)
.unwrap();
let conn = db.connect().unwrap();
conn.execute("create table test (x)", ()).await.unwrap();
}

{
let db = Database::open_remote_with_connector(
"http://schema.replica0:8080",
"",
TurmoilConnector,
)
.unwrap();
let conn = db.connect().unwrap();
conn.execute("select * from sqlite_master;", ())
.await
.unwrap();

conn.execute("create table foo (x)", ()).await.unwrap();
}

Ok(())
});

sim.run().unwrap();
}
59 changes: 59 additions & 0 deletions libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1637,3 +1637,62 @@ fn replicated_synced_frames_zero_when_no_data_synced() {

sim.run().unwrap();
}

#[test]
fn schema_db() {
let tmp_embedded = tempdir().unwrap();
let tmp_host = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();
let tmp_host_path = tmp_host.path().to_owned();

let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();

make_primary(&mut sim, tmp_host_path.clone());

sim.client("client", async move {
let http = Client::new();

assert!(http
.post(
"http://primary:9090/v1/namespaces/schema/create",
json!({ "shared_schema": true })
)
.await
.unwrap()
.status()
.is_success());
assert!(http
.post(
"http://primary:9090/v1/namespaces/foo/create",
json!({ "shared_schema_name": "schema" })
)
.await
.unwrap()
.status()
.is_success());

let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://schema.primary:8080",
"",
TurmoilConnector,
false,
None,
)
.await
.unwrap();

db.sync().await.unwrap_err();

let conn = db.connect().unwrap();

conn.execute("create table test (x)", ()).await.unwrap_err();

Ok(())
});

sim.run().unwrap();
}

0 comments on commit 0bc1533

Please sign in to comment.