Skip to content

Commit

Permalink
Fix schema migration proxy (#1572)
Browse files Browse the repository at this point in the history
fix shared schema request proxy
  • Loading branch information
MarinPostma authored Jul 18, 2024
1 parent dfeb764 commit 188644b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
27 changes: 24 additions & 3 deletions libsql-server/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::fmt;
use std::sync::Arc;

use bottomless::replicator::Replicator;
use tokio::sync::watch;

use crate::connection::{MakeConnection, RequestContext};
use crate::replication::ReplicationLogger;
use crate::replication::{FrameNo, ReplicationLogger};

pub use self::primary::{PrimaryConnection, PrimaryConnectionMaker, PrimaryDatabase};
pub use self::replica::{ReplicaConnection, ReplicaDatabase};
Expand Down Expand Up @@ -74,7 +75,7 @@ impl crate::connection::Connection for Connection {
pgm: crate::connection::program::Program,
ctx: RequestContext,
response_builder: B,
replication_index: Option<crate::replication::FrameNo>,
replication_index: Option<FrameNo>,
) -> crate::Result<B> {
match self {
Connection::Primary(conn) => {
Expand All @@ -96,7 +97,7 @@ impl crate::connection::Connection for Connection {
&self,
sql: String,
ctx: RequestContext,
replication_index: Option<crate::replication::FrameNo>,
replication_index: Option<FrameNo>,
) -> crate::Result<crate::Result<crate::connection::program::DescribeResponse>> {
match self {
Connection::Primary(conn) => conn.describe(sql, ctx, replication_index).await,
Expand Down Expand Up @@ -187,6 +188,26 @@ impl Database {
}
}

pub fn notifier(&self) -> Option<watch::Receiver<Option<FrameNo>>> {
match self {
Database::Primary(p) => Some(
p.wal_wrapper
.wrapper()
.logger()
.new_frame_notifier
.subscribe(),
),
Database::Replica(_) => None,
Database::Schema(s) => Some(
s.wal_wrapper
.wrapper()
.logger()
.new_frame_notifier
.subscribe(),
),
}
}

pub fn as_primary(&self) -> Option<&PrimaryDatabase> {
if let Self::Primary(v) = self {
Some(v)
Expand Down
10 changes: 1 addition & 9 deletions libsql-server/src/rpc/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,15 +581,7 @@ impl Proxy for ProxyService {
.namespaces
.with(ctx.namespace().clone(), |ns| {
let connection_maker = ns.db.connection_maker();
let notifier = ns
.db
.as_primary()
.expect("invalid call to stream_exec: not a primary")
.wal_wrapper
.wrapper()
.logger()
.new_frame_notifier
.subscribe();
let notifier = ns.db.notifier();
(connection_maker, notifier)
})
.await
Expand Down

0 comments on commit 188644b

Please sign in to comment.