Skip to content

Commit

Permalink
run cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Nov 13, 2023
1 parent 9aad7c0 commit 18e678b
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 9 deletions.
2 changes: 1 addition & 1 deletion libsql-replication/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl WalIndexMeta {
Ok(())
}

async fn flush_inner(&mut self ) -> std::io::Result<()> {
async fn flush_inner(&mut self) -> std::io::Result<()> {
if let Some(data) = self.data {
// FIXME: we can save a syscall by calling read_exact_at, but let's use tokio API for now
self.file.seek(SeekFrom::Start(0)).await?;
Expand Down
79 changes: 71 additions & 8 deletions libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ fn replica_primary_reset() {
});

sim.client("client", async move {
let primary = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let primary =
Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?;
let conn = primary.connect()?;

// insert a few valued into the primary
Expand All @@ -247,7 +248,14 @@ fn replica_primary_reset() {
}

let tmp = tempdir().unwrap();
let replica = Database::open_with_remote_sync_connector(tmp.path().join("data").display().to_string(), "http://primary:8080", "", TurmoilConnector).await.unwrap();
let replica = Database::open_with_remote_sync_connector(
tmp.path().join("data").display().to_string(),
"http://primary:8080",
"",
TurmoilConnector,
)
.await
.unwrap();
let replica_index = replica.sync().await.unwrap().unwrap();
let primary_index = Client::new()
.get("http://primary:9090/v1/namespaces/default/stats")
Expand All @@ -262,16 +270,47 @@ fn replica_primary_reset() {

assert_eq!(replica_index, primary_index);

let replica_count = *replica.connect().unwrap().query("select count(*) from test", ()).await.unwrap().next().unwrap().unwrap().get_value(0).unwrap().as_integer().unwrap();
let primary_count = *primary.connect().unwrap().query("select count(*) from test", ()).await.unwrap().next().unwrap().unwrap().get_value(0).unwrap().as_integer().unwrap();
let replica_count = *replica
.connect()
.unwrap()
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.unwrap()
.unwrap()
.get_value(0)
.unwrap()
.as_integer()
.unwrap();
let primary_count = *primary
.connect()
.unwrap()
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.unwrap()
.unwrap()
.get_value(0)
.unwrap()
.as_integer()
.unwrap();
assert_eq!(primary_count, replica_count);

notify.notify_waiters();
notify.notified().await;

// drop the replica here, to make sure not to reuse an open connection.
drop(replica);
let replica = Database::open_with_remote_sync_connector(tmp.path().join("data").display().to_string(), "http://primary:8080", "", TurmoilConnector).await.unwrap();
let replica = Database::open_with_remote_sync_connector(
tmp.path().join("data").display().to_string(),
"http://primary:8080",
"",
TurmoilConnector,
)
.await
.unwrap();
let replica_index = replica.sync().await.unwrap().unwrap();
let primary_index = Client::new()
.get("http://primary:9090/v1/namespaces/default/stats")
Expand All @@ -286,8 +325,32 @@ fn replica_primary_reset() {

assert_eq!(replica_index, primary_index);

let replica_count = *replica.connect().unwrap().query("select count(*) from test", ()).await.unwrap().next().unwrap().unwrap().get_value(0).unwrap().as_integer().unwrap();
let primary_count = *primary.connect().unwrap().query("select count(*) from test", ()).await.unwrap().next().unwrap().unwrap().get_value(0).unwrap().as_integer().unwrap();
let replica_count = *replica
.connect()
.unwrap()
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.unwrap()
.unwrap()
.get_value(0)
.unwrap()
.as_integer()
.unwrap();
let primary_count = *primary
.connect()
.unwrap()
.query("select count(*) from test", ())
.await
.unwrap()
.next()
.unwrap()
.unwrap()
.get_value(0)
.unwrap()
.as_integer()
.unwrap();
assert_eq!(primary_count, replica_count);

Ok(())
Expand Down

0 comments on commit 18e678b

Please sign in to comment.