Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Nov 21, 2023
1 parent 163775b commit e6300ac
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 47 deletions.
81 changes: 39 additions & 42 deletions libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use parking_lot::Mutex;
use tokio::task::spawn_blocking;
use tokio::time::Duration;
use tokio_stream::{Stream, StreamExt};
use tonic::{Status, Code};
use tonic::{Code, Status};

use crate::frame::{Frame, FrameNo};
use crate::injector::Injector;
use crate::rpc::replication::{Frame as RpcFrame, NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, NAMESPACE_DOESNT_EXIST};
use crate::rpc::replication::{
Frame as RpcFrame, NAMESPACE_DOESNT_EXIST, NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG,
};

pub use tokio_util::either::Either;

Expand Down Expand Up @@ -41,15 +43,19 @@ pub enum Error {

impl From<Status> for Error {
fn from(status: Status) -> Self {
if status.code() == Code::FailedPrecondition && status.message() == NEED_SNAPSHOT_ERROR_MSG {
if status.code() == Code::FailedPrecondition && status.message() == NEED_SNAPSHOT_ERROR_MSG
{
Error::NeedSnapshot
} else if status.code() == Code::FailedPrecondition && status.message() == NO_HELLO_ERROR_MSG {
} else if status.code() == Code::FailedPrecondition
&& status.message() == NO_HELLO_ERROR_MSG
{
Error::NoHandshake
} else if status.code() == Code::FailedPrecondition
&& status.message() == NAMESPACE_DOESNT_EXIST {
Error::NamespaceDoesntExist
} else {
Error::Client(status.into())
&& status.message() == NAMESPACE_DOESNT_EXIST
{
Error::NamespaceDoesntExist
} else {
Error::Client(status.into())
}
}
}
Expand Down Expand Up @@ -330,13 +336,12 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();

assert!(matches!(
replicator
.try_replicate_step()
.await
.unwrap_err(),
replicator.try_replicate_step().await.unwrap_err(),
Error::NamespaceDoesntExist
));
}
Expand Down Expand Up @@ -373,7 +378,9 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator.try_replicate_step().await.unwrap();
Expand Down Expand Up @@ -414,13 +421,12 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator
.try_replicate_step()
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator.try_replicate_step().await.unwrap();
assert_eq!(replicator.state, ReplicatorState::NeedHandshake);
}

Expand Down Expand Up @@ -458,13 +464,12 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator
.try_replicate_step()
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator.try_replicate_step().await.unwrap();
assert_eq!(replicator.state, ReplicatorState::NeedSnapshot);
}

Expand Down Expand Up @@ -500,13 +505,12 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator
.try_replicate_step()
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedFrames;
replicator.try_replicate_step().await.unwrap();
assert_eq!(replicator.state, ReplicatorState::NeedSnapshot);
}

Expand Down Expand Up @@ -546,10 +550,7 @@ mod test {
.await
.unwrap();
replicator.state = ReplicatorState::NeedSnapshot;
replicator
.try_replicate_step()
.await
.unwrap();
replicator.try_replicate_step().await.unwrap();
assert_eq!(replicator.state, ReplicatorState::NeedHandshake);
}

Expand Down Expand Up @@ -587,13 +588,12 @@ mod test {
}
}

let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedSnapshot;
replicator
.try_replicate_step()
let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000)
.await
.unwrap();
// we assume that we already received the handshake and the handshake is not valid anymore
replicator.state = ReplicatorState::NeedSnapshot;
replicator.try_replicate_step().await.unwrap();

assert_eq!(replicator.state, ReplicatorState::NeedHandshake);
}
Expand Down Expand Up @@ -635,10 +635,7 @@ mod test {
.unwrap();
replicator.state = ReplicatorState::NeedHandshake;
assert!(matches!(
replicator
.try_replicate_step()
.await
.unwrap_err(),
replicator.try_replicate_step().await.unwrap_err(),
Error::Fatal(_)
));
}
Expand Down
1 change: 0 additions & 1 deletion libsql-replication/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub mod replication {
/// A tonic error code to signify that a namespace doesn't exist.
pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST";


pub const SESSION_TOKEN_KEY: &str = "x-session-token";
pub const NAMESPACE_METADATA_KEY: &str = "x-namespace-bin";

Expand Down
3 changes: 1 addition & 2 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use libsql_replication::meta::WalIndexMeta;
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient;
use libsql_replication::rpc::replication::{
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY,
SESSION_TOKEN_KEY,
verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, SESSION_TOKEN_KEY,
};
use tokio::sync::watch;
use tokio_stream::{Stream, StreamExt};
Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/rpc/replication_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::stream::BoxStream;
pub use libsql_replication::rpc::replication as rpc;
use libsql_replication::rpc::replication::replication_log_server::ReplicationLog;
use libsql_replication::rpc::replication::{
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NEED_SNAPSHOT_ERROR_MSG,
NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, NAMESPACE_DOESNT_EXIST,
Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST,
NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
};
use tokio_stream::StreamExt;
use tonic::transport::server::TcpConnectInfo;
Expand Down

0 comments on commit e6300ac

Please sign in to comment.