Skip to content

Commit

Permalink
replica redirect read&write to primary in standalone mode (valkey-io#325
Browse files Browse the repository at this point in the history
)

To implement valkey-io#319 

1. replica is able to redirect read and write commands to it's primary
in standalone mode
    * reply with "-REDIRECT primary-ip:port"
2. add a subcommand `CLIENT CAPA redirect`, a client can announce the
capability to handle redirection
    * if a client can handle redirection, the data access commands (read and
write) will be redirected
3. allow `readonly` and `readwrite` command in standalone mode, may be a
breaking change
    * a client with redirect capability cannot process read commands on a
replica by default
    * use READONLY command can allow read commands on a replica

---------

Signed-off-by: zhaozhao.zz <[email protected]>
  • Loading branch information
soloestoy authored Jun 27, 2024
1 parent ab38730 commit 28c5a17
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 8 deletions.
8 changes: 0 additions & 8 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1449,20 +1449,12 @@ void askingCommand(client *c) {
* In this mode replica will not redirect clients as long as clients access
* with read-only commands to keys that are served by the replica's primary. */
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_READONLY;
addReply(c, shared.ok);
}

/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
c->flags &= ~CLIENT_READONLY;
addReply(c, shared.ok);
}
23 changes: 23 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,28 @@ struct COMMAND_ARG CLIENT_CACHING_Args[] = {
{MAKE_ARG("mode",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLIENT_CACHING_mode_Subargs},
};

/********** CLIENT CAPA ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLIENT CAPA history */
#define CLIENT_CAPA_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLIENT CAPA tips */
#define CLIENT_CAPA_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLIENT CAPA key specs */
#define CLIENT_CAPA_Keyspecs NULL
#endif

/* CLIENT CAPA argument table */
struct COMMAND_ARG CLIENT_CAPA_Args[] = {
{MAKE_ARG("capability",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,0,NULL)},
};

/********** CLIENT GETNAME ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -1552,6 +1574,7 @@ struct COMMAND_ARG CLIENT_UNBLOCK_Args[] = {
/* CLIENT command table */
struct COMMAND_STRUCT CLIENT_Subcommands[] = {
{MAKE_CMD("caching","Instructs the server whether to track the keys in the next request.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CACHING_History,0,CLIENT_CACHING_Tips,0,clientCommand,3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_CACHING_Keyspecs,0,NULL,1),.args=CLIENT_CACHING_Args},
{MAKE_CMD("capa","A client claims its capability.","O(1)","8.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_CAPA_History,0,CLIENT_CAPA_Tips,0,clientCommand,-3,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,ACL_CATEGORY_CONNECTION,CLIENT_CAPA_Keyspecs,0,NULL,1),.args=CLIENT_CAPA_Args},
{MAKE_CMD("getname","Returns the name of the connection.","O(1)","2.6.9",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETNAME_History,0,CLIENT_GETNAME_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETNAME_Keyspecs,0,NULL,0)},
{MAKE_CMD("getredir","Returns the client ID to which the connection's tracking notifications are redirected.","O(1)","6.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_GETREDIR_History,0,CLIENT_GETREDIR_Tips,0,clientCommand,2,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_GETREDIR_Keyspecs,0,NULL,0)},
{MAKE_CMD("help","Returns helpful text about the different subcommands.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,"connection",COMMAND_GROUP_CONNECTION,CLIENT_HELP_History,0,CLIENT_HELP_Tips,0,clientCommand,2,CMD_LOADING|CMD_STALE|CMD_SENTINEL,ACL_CATEGORY_CONNECTION,CLIENT_HELP_Keyspecs,0,NULL,0)},
Expand Down
29 changes: 29 additions & 0 deletions src/commands/client-capa.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"CAPA": {
"summary": "A client claims its capability.",
"complexity": "O(1)",
"group": "connection",
"since": "8.0.0",
"arity": -3,
"container": "CLIENT",
"function": "clientCommand",
"command_flags": [
"NOSCRIPT",
"LOADING",
"STALE"
],
"acl_categories": [
"CONNECTION"
],
"reply_schema": {
"const": "OK"
},
"arguments": [
{
"multiple": "true",
"name": "capability",
"type": "string"
}
]
}
}
8 changes: 8 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ client *createClient(connection *conn) {
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->capa = 0;
c->slot = -1;
c->ctime = c->last_interaction = server.unixtime;
c->duration = 0;
Expand Down Expand Up @@ -3589,6 +3590,13 @@ NULL
} else {
addReplyErrorObject(c, shared.syntaxerr);
}
} else if (!strcasecmp(c->argv[1]->ptr, "capa") && c->argc >= 3) {
for (int i = 2; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr, "redirect")) {
c->capa |= CLIENT_CAPA_REDIRECT;
}
}
addReply(c, shared.ok);
} else {
addReplySubcommandSyntaxError(c);
}
Expand Down
6 changes: 6 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3867,6 +3867,12 @@ int processCommand(client *c) {
}
}

if (!server.cluster_enabled && c->capa & CLIENT_CAPA_REDIRECT && server.primary_host && !mustObeyClient(c) &&
(is_write_command || (is_read_command && !(c->flags & CLIENT_READONLY)))) {
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
return C_OK;
}

/* Disconnect some clients if total clients memory is too high. We do this
* before key eviction, after the last command was executed and consumed
* some client output buffer memory. */
Expand Down
4 changes: 4 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */
#define CLIENT_AUTHENTICATED (1ULL << 52) /* Indicate a client has successfully authenticated */

/* Client capabilities */
#define CLIENT_CAPA_REDIRECT (1 << 0) /* Indicate that the client can handle redirection */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
Expand Down Expand Up @@ -1205,6 +1208,7 @@ typedef struct client {
uint64_t flags; /* Client flags: CLIENT_* macros. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
uint32_t capa; /* Client capabilities: CLIENT_CAPA* macros. */
serverDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
robj *lib_name; /* The client library name as set by CLIENT SETINFO. */
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/replica-redirect.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
start_server {tags {needs:repl external:skip}} {
start_server {} {
set primary_host [srv -1 host]
set primary_port [srv -1 port]

r replicaof $primary_host $primary_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replicas not replicating from primary"
}

test {replica allow read command by default} {
r get foo
} {}

test {replica reply READONLY error for write command by default} {
assert_error {READONLY*} {r set foo bar}
}

test {replica redirect read and write command after CLIENT CAPA REDIRECT} {
r client capa redirect
assert_error "REDIRECT $primary_host:$primary_port" {r set foo bar}
assert_error "REDIRECT $primary_host:$primary_port" {r get foo}
}

test {non-data access commands are not redirected} {
r ping
} {PONG}

test {replica allow read command in READONLY mode} {
r readonly
r get foo
} {}
}
}

0 comments on commit 28c5a17

Please sign in to comment.