Skip to content

Commit

Permalink
Merge pull request #507 from tursodatabase/streaming-proxy
Browse files Browse the repository at this point in the history
streaming proxy
  • Loading branch information
MarinPostma authored Nov 3, 2023
2 parents 88c3a2a + 01a2a5f commit f92feb0
Show file tree
Hide file tree
Showing 37 changed files with 2,383 additions and 623 deletions.
132 changes: 119 additions & 13 deletions libsql-replication/proto/proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package proxy;
message Queries {
repeated Query queries = 1;
// Uuid
string clientId = 2;
string client_id = 2;
}

message Query {
Expand Down Expand Up @@ -34,10 +34,10 @@ message QueryResult {

message Error {
enum ErrorCode {
SQLError = 0;
TxBusy = 1;
TxTimeout = 2;
Internal = 3;
SQL_ERROR = 0;
TX_BUSY = 1;
TX_TIMEOUT = 2;
INTERNAL = 3;
}

ErrorCode code = 1;
Expand Down Expand Up @@ -72,7 +72,7 @@ message Description {

message Value {
/// bincode encoded Value
bytes data = 1;
bytes data = 1;
}

message Row {
Expand All @@ -85,18 +85,19 @@ message Column {
}

message DisconnectMessage {
string clientId = 1;
string client_id = 1;
}

message Ack { }

enum State {
INIT = 0;
INVALID = 1;
TXN = 2;
}

message ExecuteResults {
repeated QueryResult results = 1;
enum State {
Init = 0;
Invalid = 1;
Txn = 2;
}
/// State after executing the queries
State state = 2;
/// Primary frame_no after executing the request.
Expand All @@ -111,7 +112,6 @@ message Step {
optional Cond cond = 1;
Query query = 2;
}

message Cond {
oneof cond {
OkCond ok = 1;
Expand Down Expand Up @@ -151,7 +151,113 @@ message ProgramReq {
Program pgm = 2;
}

/// Streaming exec request
message ExecReq {
/// id of the request. The response will contain this id.
uint32 request_id = 1;
oneof request {
StreamProgramReq execute = 2;
StreamDescribeReq describe = 3;
}
}

/// Describe request for the streaming protocol
message StreamProgramReq {
Program pgm = 1;
}

/// descibre request for the streaming protocol
message StreamDescribeReq {
string stmt = 1;
}

/// Response message for the streaming proto

/// Request response types
message Init { }
message BeginStep { }
message FinishStep {
uint64 affected_row_count = 1;
optional int64 last_insert_rowid = 2;
}
message StepError {
Error error = 1;
}
message ColsDescription {
repeated Column columns = 1;
}
message RowValue {
oneof value {
string text = 1;
int64 integer = 2;
double real = 3;
bytes blob = 4;
// null if present
bool null = 5;
}
}
message BeginRows { }
message BeginRow { }
message AddRowValue {
RowValue val = 1;
}
message FinishRow { }
message FinishRows { }
message Finish {
optional uint64 last_frame_no = 1;
State state = 2;
}

/// Stream execx dexcribe response messages
message DescribeParam {
optional string name = 1;
}

message DescribeCol {
string name = 1;
optional string decltype = 2;
}

message DescribeResp {
repeated DescribeParam params = 1;
repeated DescribeCol cols = 2;
bool is_explain = 3;
bool is_readonly = 4;
}

message RespStep {
oneof step {
Init init = 1;
BeginStep begin_step = 2;
FinishStep finish_step = 3;
StepError step_error = 4;
ColsDescription cols_description = 5;
BeginRows begin_rows = 6;
BeginRow begin_row = 7;
AddRowValue add_row_value = 8;
FinishRow finish_row = 9;
FinishRows finish_rows = 10;
Finish finish = 11;
}
}

message ProgramResp {
repeated RespStep steps = 1;
}

message ExecResp {
uint32 request_id = 1;
oneof response {
ProgramResp program_resp = 2;
DescribeResp describe_resp = 3;
Error error = 4;
}
}

service Proxy {
rpc StreamExec(stream ExecReq) returns (stream ExecResp) {}

// Deprecated:
rpc Execute(ProgramReq) returns (ExecuteResults) {}
rpc Describe(DescribeRequest) returns (DescribeResult) {}
rpc Disconnect(DisconnectMessage) returns (Ack) {}
Expand Down
Loading

0 comments on commit f92feb0

Please sign in to comment.