diff --git a/deps/xredis-gtid b/deps/xredis-gtid index c000e050d63..d7e95e1121e 160000 --- a/deps/xredis-gtid +++ b/deps/xredis-gtid @@ -1 +1 @@ -Subproject commit c000e050d635f0a261a90c1420ba5ece71b47115 +Subproject commit d7e95e1121efdf38f35a9aebdaa8a34a62b498ee diff --git a/src/server.c b/src/server.c index 53c4a103b6e..7db1a29dc84 100644 --- a/src/server.c +++ b/src/server.c @@ -3074,6 +3074,8 @@ void initServer(void) { server.gtid_offset_at_multi = -1; server.gtid_pending_multi_dbid = -1; server.gtid_pending_multi_offset = -1; + server.gtid_embedded_gno = 0; + server.gtid_embedded_dbid = -1; server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; server.busy_module_yield_reply = NULL; server.client_pause_in_transaction = 0; @@ -3810,8 +3812,10 @@ static void gtidPreparePropagateState(int transaction) { * multiple separated commands. Note that alsoPropagate() is not affected * by CLIENT_PREVENT_PROP flag. */ static void propagatePendingCommands(void) { - if (server.also_propagate.numops == 0) + if (server.also_propagate.numops == 0) { + server.gtid_embedded_gno = 0; return; + } int j; redisOp *rop; @@ -3865,6 +3869,7 @@ static void propagatePendingCommands(void) { } redisOpArrayFree(&server.also_propagate); + server.gtid_embedded_gno = 0; } /* Performs operations that should be performed after an execution unit ends. @@ -4114,7 +4119,7 @@ void call(client *c, int flags) { * Also, module commands take care of themselves */ if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP && - c->cmd->proc != execCommand && + c->cmd->proc != execCommand && c->cmd->proc != gtidCommand && !(c->cmd->flags & CMD_MODULE)) { int propagate_flags = PROPAGATE_NONE; diff --git a/src/server.h b/src/server.h index c9051fca084..169343170de 100644 --- a/src/server.h +++ b/src/server.h @@ -2412,6 +2412,14 @@ struct redisServer { long long gtid_offset_at_multi; int gtid_pending_multi_dbid; long long gtid_pending_multi_offset; + /* Caller-supplied GTID identity for the current GTID-wrapped command. + * Set by gtidCommand(), consumed and cleared by propagatePendingCommands() + * so that propagateArgsPrepareToFeed() rewrites ops + * with the caller's uuid/gno instead of auto-allocating a new one. */ + char gtid_embedded_uuid[CONFIG_RUN_ID_SIZE+1]; + size_t gtid_embedded_uuid_len; + gno_t gtid_embedded_gno; /* 0 means "not set" (gno starts from 1) */ + int gtid_embedded_dbid; gtidSeq *gtid_seq; long long gtid_xsync_fullresync_indicator; gtidInitialInfo gtid_initial[1]; diff --git a/tests/gtid/aof.tcl b/tests/gtid/aof.tcl index 4c20faa877f..d2c6bb0769e 100644 --- a/tests/gtid/aof.tcl +++ b/tests/gtid/aof.tcl @@ -155,7 +155,7 @@ test "aof" { {gtid * 9 PEXPIREAT k2 *} {gtid * 9 SET k3 y PXAT *} {gtid * 9 set k4 y} - {gtid * 9 expire k4 *} + {gtid A:1 9 PEXPIREAT k4 *} {gtid * 9 set k5 y} } } diff --git a/tests/modules/propagate.c b/tests/modules/propagate.c index 689d6881e13..88f322094b1 100644 --- a/tests/modules/propagate.c +++ b/tests/modules/propagate.c @@ -235,6 +235,27 @@ int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, in return REDISMODULE_OK; } +int propagateTestSingleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + /* Replicate a single command - no MULTI/EXEC wrapping. */ + RedisModule_Replicate(ctx,"INCR","c","single-counter"); + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + +int propagateTestNoreplicateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + /* Do not call RM_Replicate - no propagation expected. */ + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); @@ -370,6 +391,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"propagate-test.single", + propagateTestSingleCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"propagate-test.noreplicate", + propagateTestNoreplicateCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"propagate-test.mixed", propagateTestMixedCommand, "write",1,1,1) == REDISMODULE_ERR)