Skip to content

Commit

Permalink
Only add to last_frames_synced if frames did change.
Browse files Browse the repository at this point in the history
Fixes an issue where a returned Replicated#frames_synced will
continuously increment based off the replicator.frames_synced() even if
no new frames have synced.
  • Loading branch information
jameswritescode committed Sep 10, 2024
1 parent d9f7a46 commit 379fe7a
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 6 deletions.
121 changes: 120 additions & 1 deletion libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ fn replicated_return() {

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(4));
assert_eq!(rep.frames_synced(), 3);
assert_eq!(rep.frames_synced(), 5);

let mut row = conn.query("select count(*) from user", ()).await.unwrap();
let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap();
Expand Down Expand Up @@ -1518,3 +1518,122 @@ fn replicate_auth() {

sim.run().unwrap();
}

#[test]
fn replicated_synced_frames_zero_when_no_data_synced() {
let tmp_embedded = tempdir().unwrap();
let tmp_embedded_path = tmp_embedded.path().to_owned();

let mut sim = Builder::new()
.simulation_duration(Duration::from_secs(1000))
.build();
let tmp = tempdir().unwrap();

let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();

init_tracing();
sim.host("primary", move || {
let notify = notify_clone.clone();
let path = tmp.path().to_path_buf();
async move {
let make_server = || async {
TestServer {
path: path.clone().into(),
user_api_config: UserApiConfig {
..Default::default()
},
admin_api_config: Some(AdminApiConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
connector: TurmoilConnector,
disable_metrics: true,
auth_key: None,
}),
rpc_server_config: Some(RpcServerConfig {
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
tls_config: None,
}),
..Default::default()
}
};

let server = make_server().await;
let shutdown = server.shutdown.clone();

let fut = async move { server.start_sim(8080).await };

tokio::pin!(fut);

loop {
tokio::select! {
res = &mut fut => {
res.unwrap();
break
}
_ = notify.notified() => {
shutdown.notify_waiters();
},
}
}

drop(fut);

tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel"))
.await
.unwrap();

notify.notify_waiters();
let server = make_server().await;
server.start_sim(8080).await.unwrap();

Ok(())
}
});

sim.client("client", async move {
let path = tmp_embedded_path.join("embedded");
let db = Database::open_with_remote_sync_connector(
path.to_str().unwrap(),
"http://primary:8080",
"",
TurmoilConnector,
false,
None,
)
.await?;

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), None);
assert_eq!(rep.frames_synced(), 0);

let conn = db.connect()?;

conn.execute("CREATE TABLE user (id INTEGER)", ())
.await
.unwrap();

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(1));
assert_eq!(rep.frames_synced(), 2);

conn.execute("INSERT into user(id) values (randomblob(4096));", ())
.await
.unwrap();

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(4));
assert_eq!(rep.frames_synced(), 3);

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(4));
assert_eq!(rep.frames_synced(), 0);

let rep = db.sync().await.unwrap();
assert_eq!(rep.frame_no(), Some(4));
assert_eq!(rep.frames_synced(), 0);

Ok(())
});

sim.run().unwrap();
}
23 changes: 18 additions & 5 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,26 @@ impl EmbeddedReplicator {
}
}

let last_frames_synced = self.last_frames_synced.fetch_add(
replicator.frames_synced(),
std::sync::atomic::Ordering::Relaxed,
);
let current_frames_synced = replicator.frames_synced();

let mut last_frames_synced = self
.last_frames_synced
.load(std::sync::atomic::Ordering::Relaxed);

while current_frames_synced > last_frames_synced {
match self.last_frames_synced.compare_exchange(
last_frames_synced,
current_frames_synced,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
) {
Ok(_) => break,
Err(current_value) => last_frames_synced = current_value,
}
}

let frames_synced =
((replicator.frames_synced() as i64 - last_frames_synced as i64).abs()) as usize;
((current_frames_synced as i64 - last_frames_synced as i64).abs()) as usize;

let replicated = Replicated {
frame_no: replicator.client_mut().committed_frame_no(),
Expand Down

0 comments on commit 379fe7a

Please sign in to comment.