Skip to content

Commit

Permalink
Merge pull request #557 from tursodatabase/retrocompatible-proxy-proto
Browse files Browse the repository at this point in the history
retrocompatible proxy proto
  • Loading branch information
LucioFranco authored Nov 2, 2023
2 parents 60e4fb8 + 3dccc8d commit 33f74a3
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 12 deletions.
8 changes: 7 additions & 1 deletion libsql-replication/proto/replication_log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ message LogOffset {
uint64 next_offset = 1;
}

message HelloRequest {}
message HelloRequest {
optional uint64 handshake_version = 1;
}

message HelloResponse {
/// Uuid of the current generation
string generation_id = 1;
/// First frame_no in the current generation
uint64 generation_start_index = 2;
/// id of the replicated log
string log_id = 3;
/// string-encoded Uuid v4 token for the current session, changes on each restart, and must be passed in subsequent requests header.string
Expand Down
11 changes: 10 additions & 1 deletion libsql-replication/src/generated/wal_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@ pub struct LogOffset {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloRequest {}
pub struct HelloRequest {
#[prost(uint64, optional, tag = "1")]
pub handshake_version: ::core::option::Option<u64>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloResponse {
/// / Uuid of the current generation
#[prost(string, tag = "1")]
pub generation_id: ::prost::alloc::string::String,
/// / First frame_no in the current generation
#[prost(uint64, tag = "2")]
pub generation_start_index: u64,
/// / id of the replicated log
#[prost(string, tag = "3")]
pub log_id: ::prost::alloc::string::String,
Expand Down
8 changes: 8 additions & 0 deletions libsql-replication/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ pub mod replication {

Ok(())
}

impl HelloRequest {
pub fn new() -> Self {
Self {
handshake_version: Some(1),
}
}
}
}
2 changes: 1 addition & 1 deletion libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ReplicatorClient for Client {

async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest::default());
let req = self.make_request(HelloRequest::new());
match self.client.hello(req).await {
Ok(resp) => {
let hello = resp.into_inner();
Expand Down
55 changes: 47 additions & 8 deletions libsql-server/src/rpc/replication_log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashSet;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use bytes::Bytes;
use futures::stream::BoxStream;
Expand All @@ -10,23 +12,28 @@ use libsql_replication::rpc::replication::{
NO_HELLO_ERROR_MSG, SESSION_TOKEN_KEY,
};
use tokio_stream::StreamExt;
use tonic::transport::server::TcpConnectInfo;
use tonic::Status;
use uuid::Uuid;

use crate::auth::Auth;
use crate::namespace::{NamespaceStore, PrimaryNamespaceMaker};
use crate::namespace::{NamespaceName, NamespaceStore, PrimaryNamespaceMaker};
use crate::replication::primary::frame_stream::FrameStream;
use crate::replication::LogReadError;
use crate::utils::services::idle_shutdown::IdleShutdownKicker;

use super::NAMESPACE_DOESNT_EXIST;
use super::{extract_namespace, NAMESPACE_DOESNT_EXIST};

pub struct ReplicationLogService {
namespaces: NamespaceStore<PrimaryNamespaceMaker>,
idle_shutdown_layer: Option<IdleShutdownKicker>,
auth: Option<Arc<Auth>>,
disable_namespaces: bool,
session_token: Bytes,

//deprecated:
generation_id: Uuid,
replicas_with_hello: RwLock<HashSet<(SocketAddr, NamespaceName)>>,
}

pub const MAX_FRAMES_PER_BATCH: usize = 1024;
Expand All @@ -45,6 +52,8 @@ impl ReplicationLogService {
idle_shutdown_layer,
auth,
disable_namespaces,
generation_id: Uuid::new_v4(),
replicas_with_hello: Default::default(),
}
}

Expand All @@ -58,11 +67,25 @@ impl ReplicationLogService {

fn verify_session_token<R>(&self, req: &tonic::Request<R>) -> Result<(), Status> {
let no_hello = || Err(Status::failed_precondition(NO_HELLO_ERROR_MSG));
let Some(token) = req.metadata().get(SESSION_TOKEN_KEY) else {
return no_hello();
};
if token.as_bytes() != self.session_token {
return no_hello();
match req.metadata().get(SESSION_TOKEN_KEY) {
Some(token) => {
if token.as_bytes() != self.session_token {
return no_hello();
}
}
None => {
// legacy: old replicas used stateful session management
let replica_addr = req
.remote_addr()
.ok_or(Status::internal("No remote RPC address"))?;
{
let namespace = extract_namespace(self.disable_namespaces, req)?;
let guard = self.replicas_with_hello.read().unwrap();
if !guard.contains(&(replica_addr, namespace)) {
return no_hello();
}
}
}
}

Ok(())
Expand Down Expand Up @@ -203,6 +226,19 @@ impl ReplicationLog for ReplicationLogService {
self.authenticate(&req)?;
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;

// legacy support
if req.get_ref().handshake_version.is_none() {
req.extensions().get::<TcpConnectInfo>().unwrap();
let replica_addr = req
.remote_addr()
.ok_or(Status::internal("No remote RPC address"))?;

{
let mut guard = self.replicas_with_hello.write().unwrap();
guard.insert((replica_addr, namespace.clone()));
}
}

let logger = self
.namespaces
.with(namespace, |ns| ns.db.logger.clone())
Expand All @@ -218,6 +254,8 @@ impl ReplicationLog for ReplicationLogService {
let response = HelloResponse {
log_id: logger.log_id().to_string(),
session_token: self.session_token.clone(),
generation_id: self.generation_id.to_string(),
generation_start_index: 0,
};

Ok(tonic::Response::new(response))
Expand All @@ -228,6 +266,7 @@ impl ReplicationLog for ReplicationLogService {
req: tonic::Request<LogOffset>,
) -> Result<tonic::Response<Self::SnapshotStream>, Status> {
self.authenticate(&req)?;
self.verify_session_token(&req)?;
let namespace = super::extract_namespace(self.disable_namespaces, &req)?;
let req = req.into_inner();
let logger = self
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl ReplicatorClient for RemoteClient {
/// Perform handshake with remote
async fn handshake(&mut self) -> Result<(), Error> {
tracing::info!("Attempting to perform handshake with primary.");
let req = self.make_request(HelloRequest::default());
let req = self.make_request(HelloRequest::new());
match self.remote.replication.hello(req).await {
Ok(resp) => {
let hello = resp.into_inner();
Expand Down

0 comments on commit 33f74a3

Please sign in to comment.