Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,8 @@ void initServer(void) {
server.gtid_offset_at_multi = -1;
server.gtid_pending_multi_dbid = -1;
server.gtid_pending_multi_offset = -1;
server.gtid_embedded_gno = 0;
server.gtid_embedded_dbid = -1;
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
server.busy_module_yield_reply = NULL;
server.client_pause_in_transaction = 0;
Expand Down Expand Up @@ -3810,8 +3812,10 @@ static void gtidPreparePropagateState(int transaction) {
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
static void propagatePendingCommands(void) {
if (server.also_propagate.numops == 0)
if (server.also_propagate.numops == 0) {
server.gtid_embedded_gno = 0;
return;
}

int j;
redisOp *rop;
Expand Down Expand Up @@ -3865,6 +3869,7 @@ static void propagatePendingCommands(void) {
}

redisOpArrayFree(&server.also_propagate);
server.gtid_embedded_gno = 0;
}

/* Performs operations that should be performed after an execution unit ends.
Expand Down Expand Up @@ -4114,7 +4119,7 @@ void call(client *c, int flags) {
* Also, module commands take care of themselves */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP &&
c->cmd->proc != execCommand &&
c->cmd->proc != execCommand && c->cmd->proc != gtidCommand &&
!(c->cmd->flags & CMD_MODULE))
{
int propagate_flags = PROPAGATE_NONE;
Expand Down
8 changes: 8 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2412,6 +2412,14 @@ struct redisServer {
long long gtid_offset_at_multi;
int gtid_pending_multi_dbid;
long long gtid_pending_multi_offset;
/* Caller-supplied GTID identity for the current GTID-wrapped command.
* Set by gtidCommand(), consumed and cleared by propagatePendingCommands()
* so that propagateArgsPrepareToFeed() rewrites ops
* with the caller's uuid/gno instead of auto-allocating a new one. */
char gtid_embedded_uuid[CONFIG_RUN_ID_SIZE+1];
size_t gtid_embedded_uuid_len;
gno_t gtid_embedded_gno; /* 0 means "not set" (gno starts from 1) */
int gtid_embedded_dbid;
gtidSeq *gtid_seq;
long long gtid_xsync_fullresync_indicator;
gtidInitialInfo gtid_initial[1];
Expand Down
2 changes: 1 addition & 1 deletion tests/gtid/aof.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ test "aof" {
{gtid * 9 PEXPIREAT k2 *}
{gtid * 9 SET k3 y PXAT *}
{gtid * 9 set k4 y}
{gtid * 9 expire k4 *}
{gtid A:1 9 PEXPIREAT k4 *}
{gtid * 9 set k5 y}
}
}
Expand Down
31 changes: 31 additions & 0 deletions tests/modules/propagate.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,27 @@ int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, in
return REDISMODULE_OK;
}

int propagateTestSingleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

/* Replicate a single command - no MULTI/EXEC wrapping. */
RedisModule_Replicate(ctx,"INCR","c","single-counter");
RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestNoreplicateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

/* Do not call RM_Replicate - no propagation expected. */
RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
Expand Down Expand Up @@ -370,6 +391,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.single",
propagateTestSingleCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.noreplicate",
propagateTestNoreplicateCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.mixed",
propagateTestMixedCommand,
"write",1,1,1) == REDISMODULE_ERR)
Expand Down
Loading