diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index fed475aba4..730786019f 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -5,11 +5,13 @@ use parking_lot::Mutex; use tokio::task::spawn_blocking; use tokio::time::Duration; use tokio_stream::{Stream, StreamExt}; -use tonic::{Status, Code}; +use tonic::{Code, Status}; use crate::frame::{Frame, FrameNo}; use crate::injector::Injector; -use crate::rpc::replication::{Frame as RpcFrame, NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, NAMESPACE_DOESNT_EXIST}; +use crate::rpc::replication::{ + Frame as RpcFrame, NAMESPACE_DOESNT_EXIST, NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, +}; pub use tokio_util::either::Either; @@ -41,15 +43,19 @@ pub enum Error { impl From for Error { fn from(status: Status) -> Self { - if status.code() == Code::FailedPrecondition && status.message() == NEED_SNAPSHOT_ERROR_MSG { + if status.code() == Code::FailedPrecondition && status.message() == NEED_SNAPSHOT_ERROR_MSG + { Error::NeedSnapshot - } else if status.code() == Code::FailedPrecondition && status.message() == NO_HELLO_ERROR_MSG { + } else if status.code() == Code::FailedPrecondition + && status.message() == NO_HELLO_ERROR_MSG + { Error::NoHandshake } else if status.code() == Code::FailedPrecondition - && status.message() == NAMESPACE_DOESNT_EXIST { - Error::NamespaceDoesntExist - } else { - Error::Client(status.into()) + && status.message() == NAMESPACE_DOESNT_EXIST + { + Error::NamespaceDoesntExist + } else { + Error::Client(status.into()) } } } @@ -330,13 +336,12 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); assert!(matches!( - replicator - .try_replicate_step() - .await - .unwrap_err(), + replicator.try_replicate_step().await.unwrap_err(), Error::NamespaceDoesntExist )); } @@ -373,7 +378,9 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) + .await + .unwrap(); // we assume that we already received the handshake and the handshake is not valid anymore replicator.state = ReplicatorState::NeedFrames; replicator.try_replicate_step().await.unwrap(); @@ -414,13 +421,12 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); - // we assume that we already received the handshake and the handshake is not valid anymore - replicator.state = ReplicatorState::NeedFrames; - replicator - .try_replicate_step() + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) .await .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); assert_eq!(replicator.state, ReplicatorState::NeedHandshake); } @@ -458,13 +464,12 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); - // we assume that we already received the handshake and the handshake is not valid anymore - replicator.state = ReplicatorState::NeedFrames; - replicator - .try_replicate_step() + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) .await .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); assert_eq!(replicator.state, ReplicatorState::NeedSnapshot); } @@ -500,13 +505,12 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); - // we assume that we already received the handshake and the handshake is not valid anymore - replicator.state = ReplicatorState::NeedFrames; - replicator - .try_replicate_step() + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) .await .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedFrames; + replicator.try_replicate_step().await.unwrap(); assert_eq!(replicator.state, ReplicatorState::NeedSnapshot); } @@ -546,10 +550,7 @@ mod test { .await .unwrap(); replicator.state = ReplicatorState::NeedSnapshot; - replicator - .try_replicate_step() - .await - .unwrap(); + replicator.try_replicate_step().await.unwrap(); assert_eq!(replicator.state, ReplicatorState::NeedHandshake); } @@ -587,13 +588,12 @@ mod test { } } - let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000).await.unwrap(); - // we assume that we already received the handshake and the handshake is not valid anymore - replicator.state = ReplicatorState::NeedSnapshot; - replicator - .try_replicate_step() + let mut replicator = Replicator::new(Client, tmp.path().to_path_buf(), 10000) .await .unwrap(); + // we assume that we already received the handshake and the handshake is not valid anymore + replicator.state = ReplicatorState::NeedSnapshot; + replicator.try_replicate_step().await.unwrap(); assert_eq!(replicator.state, ReplicatorState::NeedHandshake); } @@ -635,10 +635,7 @@ mod test { .unwrap(); replicator.state = ReplicatorState::NeedHandshake; assert!(matches!( - replicator - .try_replicate_step() - .await - .unwrap_err(), + replicator.try_replicate_step().await.unwrap_err(), Error::Fatal(_) )); } diff --git a/libsql-replication/src/rpc.rs b/libsql-replication/src/rpc.rs index ada65c2a9d..c2021be8a9 100644 --- a/libsql-replication/src/rpc.rs +++ b/libsql-replication/src/rpc.rs @@ -32,7 +32,6 @@ pub mod replication { /// A tonic error code to signify that a namespace doesn't exist. pub const NAMESPACE_DOESNT_EXIST: &str = "NAMESPACE_DOESNT_EXIST"; - pub const SESSION_TOKEN_KEY: &str = "x-session-token"; pub const NAMESPACE_METADATA_KEY: &str = "x-namespace-bin"; diff --git a/libsql-server/src/replication/replicator_client.rs b/libsql-server/src/replication/replicator_client.rs index 580c365d85..25e55c37e4 100644 --- a/libsql-server/src/replication/replicator_client.rs +++ b/libsql-server/src/replication/replicator_client.rs @@ -7,8 +7,7 @@ use libsql_replication::meta::WalIndexMeta; use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient}; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use libsql_replication::rpc::replication::{ - verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, - SESSION_TOKEN_KEY, + verify_session_token, HelloRequest, LogOffset, NAMESPACE_METADATA_KEY, SESSION_TOKEN_KEY, }; use tokio::sync::watch; use tokio_stream::{Stream, StreamExt}; diff --git a/libsql-server/src/rpc/replication_log.rs b/libsql-server/src/rpc/replication_log.rs index db02d13210..b3919a630e 100644 --- a/libsql-server/src/rpc/replication_log.rs +++ b/libsql-server/src/rpc/replication_log.rs @@ -8,8 +8,8 @@ use futures::stream::BoxStream; pub use libsql_replication::rpc::replication as rpc; use libsql_replication::rpc::replication::replication_log_server::ReplicationLog; use libsql_replication::rpc::replication::{ - Frame, Frames, HelloRequest, HelloResponse, LogOffset, NEED_SNAPSHOT_ERROR_MSG, - NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, NAMESPACE_DOESNT_EXIST, + Frame, Frames, HelloRequest, HelloResponse, LogOffset, NAMESPACE_DOESNT_EXIST, + NEED_SNAPSHOT_ERROR_MSG, NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY, }; use tokio_stream::StreamExt; use tonic::transport::server::TcpConnectInfo;