diff --git a/include/spock.h b/include/spock.h index 2d96941f..8c96de31 100644 --- a/include/spock.h +++ b/include/spock.h @@ -55,6 +55,51 @@ extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; extern int log_origin_change; extern int spock_apply_idle_timeout; +extern int spock_log_verbosity; +extern int spock_apply_change_logging; + +/* + * spock.log_verbosity - controls the verbosity of spock-specific log output. + * + * Spock's own DEBUG1/DEBUG2 messages are silent by default because PostgreSQL's + * log_min_messages typically defaults to WARNING or NOTICE. Customer Success + * needs to crank up spock detail when debugging an issue without flipping + * log_min_messages globally (which floods the logs with non-spock output). + * + * When spock_log_verbosity = SPOCK_LOG_VERBOSITY_VERBOSE every spock-emitted + * DEBUG message routed through SPOCK_DEBUG1 / SPOCK_DEBUG2 is promoted to + * LOG level so it appears in standard server logs. + */ +typedef enum SpockLogVerbosity +{ + SPOCK_LOG_VERBOSITY_NORMAL = 0, + SPOCK_LOG_VERBOSITY_VERBOSE +} SpockLogVerbosity; + +/* + * spock.apply_change_logging - controls JSON change logging by the apply worker. + * + * none - default; no extra logging. + * key_only - log {action, schema, table, primary_key, origin, commit_ts} + * for each DML change. DDL is also logged with its SQL text. + * verbose - all of the above, plus old/new row data for DML. + */ +typedef enum SpockApplyChangeLogging +{ + SPOCK_APPLY_CHANGE_LOG_NONE = 0, + SPOCK_APPLY_CHANGE_LOG_KEY_ONLY, + SPOCK_APPLY_CHANGE_LOG_VERBOSE +} SpockApplyChangeLogging; + +/* + * Route every spock-specific DEBUG1/DEBUG2 ereport through these macros so + * spock.log_verbosity = 'verbose' can promote them to LOG without recompiling. + * Used as `ereport(SPOCK_DEBUG1, (errmsg("...")));` + */ +#define SPOCK_LOG_DEBUG_LEVEL(orig_level) \ + ((spock_log_verbosity == SPOCK_LOG_VERBOSITY_VERBOSE) ? LOG : (orig_level)) +#define SPOCK_DEBUG1 SPOCK_LOG_DEBUG_LEVEL(DEBUG1) +#define SPOCK_DEBUG2 SPOCK_LOG_DEBUG_LEVEL(DEBUG2) extern char *shorten_hash(const char *str, int maxlen); extern void gen_slot_name(Name slot_name, char *dbname, diff --git a/include/spock_change_log.h b/include/spock_change_log.h new file mode 100644 index 00000000..36a4666e --- /dev/null +++ b/include/spock_change_log.h @@ -0,0 +1,30 @@ +/*------------------------------------------------------------------------- + * + * spock_change_log.h + * Public surface for spock.apply_change_logging output. + * + *------------------------------------------------------------------------- + */ +#ifndef SPOCK_CHANGE_LOG_H +#define SPOCK_CHANGE_LOG_H + +/* + * Forward declarations only. spock_change_log.h is included from + * spock_apply.c right next to other spock_*.h headers, and pulling in + * spock_proto_native.h / spock_relcache.h would also drag in + * spock_compat.h via spock_output_plugin.h - the resulting macro + * redefinitions then collide with commands/trigger.h declarations + * pulled in further down the include chain (spock_common.h). + */ +struct SpockRelation; +struct SpockTupleData; + +extern void spock_log_apply_change(const char *action, + struct SpockRelation *rel, + struct SpockTupleData *oldtup, + struct SpockTupleData *newtup, + const char *origin_name); + +extern void spock_log_apply_ddl(const char *sql, const char *origin_name); + +#endif /* SPOCK_CHANGE_LOG_H */ diff --git a/src/spock.c b/src/spock.c index 9b7907e1..35384e9c 100644 --- a/src/spock.c +++ b/src/spock.c @@ -139,6 +139,19 @@ static const struct config_enum_entry readonly_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry log_verbosity_options[] = { + {"normal", SPOCK_LOG_VERBOSITY_NORMAL, false}, + {"verbose", SPOCK_LOG_VERBOSITY_VERBOSE, false}, + {NULL, 0, false} +}; + +static const struct config_enum_entry apply_change_logging_options[] = { + {"none", SPOCK_APPLY_CHANGE_LOG_NONE, false}, + {"key_only", SPOCK_APPLY_CHANGE_LOG_KEY_ONLY, false}, + {"verbose", SPOCK_APPLY_CHANGE_LOG_VERBOSE, false}, + {NULL, 0, false} +}; + bool spock_synchronous_commit = false; char *spock_temp_directory = ""; static char *spock_temp_directory_config; @@ -156,6 +169,8 @@ bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; int log_origin_change = SPOCK_ORIGIN_NONE; int spock_apply_idle_timeout = 300; +int spock_log_verbosity = SPOCK_LOG_VERBOSITY_NORMAL; +int spock_apply_change_logging = SPOCK_APPLY_CHANGE_LOG_NONE; static emit_log_hook_type prev_emit_log_hook = NULL; @@ -1085,6 +1100,30 @@ _PG_init(void) 0, NULL, NULL, NULL); + DefineCustomEnumVariable("spock.log_verbosity", + gettext_noop("Sets the verbosity of spock-specific log output."), + gettext_noop("When 'verbose', spock's own DEBUG1 and DEBUG2 messages are " + "promoted to LOG level so they appear in standard server logs " + "without changing log_min_messages globally."), + &spock_log_verbosity, + SPOCK_LOG_VERBOSITY_NORMAL, + log_verbosity_options, + PGC_SUSET, 0, + NULL, NULL, NULL); + + DefineCustomEnumVariable("spock.apply_change_logging", + gettext_noop("Controls JSON logging of changes applied by the apply worker."), + gettext_noop("'none' (default) disables. 'key_only' logs the action, " + "schema-qualified relation, primary-key values, origin and " + "commit timestamp for each DML change in JSON. 'verbose' " + "additionally logs old/new row data. DDL is logged in both " + "'key_only' and 'verbose' modes."), + &spock_apply_change_logging, + SPOCK_APPLY_CHANGE_LOG_NONE, + apply_change_logging_options, + PGC_SUSET, 0, + NULL, NULL, NULL); + DefineCustomBoolVariable("spock.synchronous_commit", "spock specific synchronous commit value", NULL, diff --git a/src/spock_apply.c b/src/spock_apply.c index 259c2ace..d6b1d748 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -67,6 +67,7 @@ #include "replication/walsender_private.h" #include "spock_autoddl.h" +#include "spock_change_log.h" #include "spock_common.h" #include "spock_conflict.h" #include "spock_executor.h" @@ -1410,6 +1411,10 @@ handle_insert(StringInfo s) spock_apply_heap_insert(rel, &newtup); } + if (!failed) + spock_log_apply_change("INSERT", rel, NULL, &newtup, + remote_origin_name); + /* * DML operation is finished. Be paranoid and check memory context before * switching out and cleaning the per-operation memory context @@ -1576,6 +1581,11 @@ handle_update(StringInfo s) spock_apply_heap_update(rel, hasoldtup ? &oldtup : &newtup, &newtup); } + if (!failed) + spock_log_apply_change("UPDATE", rel, + hasoldtup ? &oldtup : NULL, &newtup, + remote_origin_name); + spock_relation_close(rel, NoLock); end_replication_step(); @@ -1703,6 +1713,10 @@ handle_delete(StringInfo s) spock_apply_heap_delete(rel, &oldtup); } + if (!failed) + spock_log_apply_change("DELETE", rel, &oldtup, NULL, + remote_origin_name); + spock_relation_close(rel, NoLock); end_replication_step(); @@ -2337,6 +2351,14 @@ handle_sql(QueuedMessage *queued_message, bool tx_just_started, char **sql) "item type %d expected %d", MySubscription->name, r, WJB_DONE); + /* + * Emit the DDL change-log record (in both 'key_only' and 'verbose' + * modes) BEFORE execution. Logging at this point means the operator + * always sees what SQL was about to run, even if the execution itself + * later errors out. + */ + spock_log_apply_ddl(*sql, remote_origin_name); + /* Run the extracted SQL. */ spock_execute_sql_command(*sql, queued_message->role, tx_just_started); } diff --git a/src/spock_change_log.c b/src/spock_change_log.c new file mode 100644 index 00000000..59a8959d --- /dev/null +++ b/src/spock_change_log.c @@ -0,0 +1,290 @@ +/*------------------------------------------------------------------------- + * + * spock_change_log.c + * JSON change-log output controlled by spock.apply_change_logging GUC. + * + * Called from the apply worker after each successful DML or DDL has + * been applied locally. Emits a single JSON object per change at + * LOG level so the operator can correlate replayed changes with + * origin / commit_ts. + * + * Copyright (c) 2022-2025, pgEdge, Inc. + * Copyright (c) 2020-2022, OSCG-Partners + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/genam.h" +#include "access/relation.h" +#include "lib/stringinfo.h" +#include "nodes/bitmapset.h" +#include "replication/origin.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/timestamp.h" + +#include "spock.h" +#include "spock_proto_native.h" +#include "spock_relcache.h" +#include "spock_change_log.h" + +/* + * Append a JSON-quoted string. Handles \\, \", and control chars below 0x20. + */ +static void +append_json_string(StringInfo buf, const char *str) +{ + appendStringInfoChar(buf, '"'); + if (str != NULL) + { + while (*str) + { + unsigned char c = (unsigned char) *str++; + + switch (c) + { + case '"': appendStringInfoString(buf, "\\\""); break; + case '\\': appendStringInfoString(buf, "\\\\"); break; + case '\b': appendStringInfoString(buf, "\\b"); break; + case '\f': appendStringInfoString(buf, "\\f"); break; + case '\n': appendStringInfoString(buf, "\\n"); break; + case '\r': appendStringInfoString(buf, "\\r"); break; + case '\t': appendStringInfoString(buf, "\\t"); break; + default: + if (c < 0x20) + appendStringInfo(buf, "\\u%04x", c); + else + appendStringInfoChar(buf, (char) c); + } + } + } + appendStringInfoChar(buf, '"'); +} + +/* + * Append a Datum as a JSON value. Always emits text-quoted form (after + * type-output conversion); NULL becomes JSON null. Quoting every value + * keeps the output type-stable across PostgreSQL versions and avoids + * having to track per-type numeric vs string distinctions. + */ +static void +append_json_value(StringInfo buf, Oid typid, Datum value, bool isnull) +{ + Oid typoutput; + bool typisvarlena; + char *str; + + if (isnull) + { + appendStringInfoString(buf, "null"); + return; + } + + getTypeOutputInfo(typid, &typoutput, &typisvarlena); + str = OidOutputFunctionCall(typoutput, value); + append_json_string(buf, str); + pfree(str); +} + +/* + * Append a row as a JSON object. If pk_only is true, only the columns + * that participate in the relation's primary key are emitted. + */ +static void +append_row_json(StringInfo buf, SpockRelation *rel, + SpockTupleData *tup, bool pk_only) +{ + Bitmapset *pk_atts = NULL; + int i; + bool first = true; + + appendStringInfoChar(buf, '{'); + + /* + * Build the set of local attnums that are part of the primary key + * index. We deliberately use NoLock here - the apply worker + * already holds a row-level lock on the user relation, and we are + * only inspecting the in-memory index metadata. + */ + if (pk_only && OidIsValid(rel->idxoid)) + { + Relation idx; + + idx = relation_open(rel->idxoid, NoLock); + if (idx->rd_index != NULL) + { + int natts = idx->rd_index->indnatts; + int j; + + for (j = 0; j < natts; j++) + { + AttrNumber a = idx->rd_index->indkey.values[j]; + + if (a > 0) + pk_atts = bms_add_member(pk_atts, a); + } + } + relation_close(idx, NoLock); + } + + for (i = 0; i < rel->natts; i++) + { + AttrNumber local_attno; + + /* + * SpockTupleData is indexed by remote attnum (0..natts-1). + * rel->attmap (when set) maps remote -> local 0-based. When + * attmap is NULL the columns are 1:1 and the local attnum is + * simply i + 1. + */ + local_attno = (rel->attmap != NULL) ? rel->attmap[i] + 1 : i + 1; + + if (pk_only) + { + if (pk_atts == NULL || !bms_is_member(local_attno, pk_atts)) + continue; + } + + if (!first) + appendStringInfoChar(buf, ','); + first = false; + + append_json_string(buf, rel->attnames[i]); + appendStringInfoChar(buf, ':'); + append_json_value(buf, rel->attrtypes[i], + tup->values[i], tup->nulls[i]); + } + + if (pk_atts != NULL) + bms_free(pk_atts); + + appendStringInfoChar(buf, '}'); +} + +/* + * Append the common (action, schema, table, origin, commit_ts) header + * fields used by both DML and DDL records. origin_name may be NULL. + */ +static void +append_change_log_header(StringInfo buf, const char *action, + const char *nspname, const char *relname, + const char *origin_name) +{ + appendStringInfoString(buf, "\"action\":"); + append_json_string(buf, action); + + if (nspname != NULL) + { + appendStringInfoString(buf, ",\"schema\":"); + append_json_string(buf, nspname); + } + if (relname != NULL) + { + appendStringInfoString(buf, ",\"table\":"); + append_json_string(buf, relname); + } + if (origin_name != NULL) + { + appendStringInfoString(buf, ",\"origin\":"); + append_json_string(buf, origin_name); + } + if (replorigin_session_origin_timestamp != 0) + { + appendStringInfo(buf, ",\"commit_ts\":\"%s\"", + timestamptz_to_str(replorigin_session_origin_timestamp)); + } +} + +/* + * Log a DML change. Called by the apply worker AFTER a successful + * INSERT/UPDATE/DELETE. No-op when spock.apply_change_logging = 'none'. + * + * action - "INSERT", "UPDATE", or "DELETE" + * rel - target SpockRelation (must not be NULL) + * oldtup - old-row tuple data (UPDATE/DELETE), may be NULL + * newtup - new-row tuple data (INSERT/UPDATE), may be NULL + * origin_name - origin node name, may be NULL + */ +void +spock_log_apply_change(const char *action, + SpockRelation *rel, + SpockTupleData *oldtup, + SpockTupleData *newtup, + const char *origin_name) +{ + StringInfoData buf; + SpockTupleData *pk_src; + + if (spock_apply_change_logging == SPOCK_APPLY_CHANGE_LOG_NONE) + return; + if (rel == NULL) + return; + + initStringInfo(&buf); + appendStringInfoChar(&buf, '{'); + + append_change_log_header(&buf, action, rel->nspname, rel->relname, + origin_name); + + /* primary key - take it from the new tuple for INSERT/UPDATE, else old */ + pk_src = (newtup != NULL) ? newtup : oldtup; + if (pk_src != NULL) + { + appendStringInfoString(&buf, ",\"pk\":"); + append_row_json(&buf, rel, pk_src, true /* pk_only */); + } + + if (spock_apply_change_logging == SPOCK_APPLY_CHANGE_LOG_VERBOSE) + { + if (oldtup != NULL) + { + appendStringInfoString(&buf, ",\"old\":"); + append_row_json(&buf, rel, oldtup, false); + } + if (newtup != NULL) + { + appendStringInfoString(&buf, ",\"new\":"); + append_row_json(&buf, rel, newtup, false); + } + } + + appendStringInfoChar(&buf, '}'); + + ereport(LOG, + (errmsg_internal("spock apply change: %s", buf.data))); + + pfree(buf.data); +} + +/* + * Log a DDL change. Called by the apply worker after extracting the SQL + * text from a queued DDL message. No-op when apply_change_logging = 'none' + * (DDL is logged in both 'key_only' and 'verbose' modes, per the spec). + */ +void +spock_log_apply_ddl(const char *sql, const char *origin_name) +{ + StringInfoData buf; + + if (spock_apply_change_logging == SPOCK_APPLY_CHANGE_LOG_NONE) + return; + if (sql == NULL) + return; + + initStringInfo(&buf); + appendStringInfoChar(&buf, '{'); + + append_change_log_header(&buf, "DDL", NULL, NULL, origin_name); + + appendStringInfoString(&buf, ",\"sql\":"); + append_json_string(&buf, sql); + + appendStringInfoChar(&buf, '}'); + + ereport(LOG, + (errmsg_internal("spock apply change: %s", buf.data))); + + pfree(buf.data); +} diff --git a/src/spock_common.c b/src/spock_common.c index 3f8abc25..a612328f 100644 --- a/src/spock_common.c +++ b/src/spock_common.c @@ -27,6 +27,7 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "spock.h" #include "spock_common.h" #include "spock_compat.h" @@ -388,17 +389,17 @@ SpockRelationFindReplTupleByIndex(EState *estate, case TM_Updated: /* XXX: Improve handling here */ if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid)) - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying"))); else - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; case TM_Deleted: /* XXX: Improve handling here */ - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent delete, retrying"))); goto retry; diff --git a/src/spock_dependency.c b/src/spock_dependency.c index a53ca168..6b16837b 100644 --- a/src/spock_dependency.c +++ b/src/spock_dependency.c @@ -765,7 +765,7 @@ reportDependentObjects(const ObjectAddresses *targetObjects, * results are too confusing when client_min_messages and * log_min_messages are different. */ - ereport(DEBUG2, + ereport(SPOCK_DEBUG2, (errmsg("drop auto-cascades to %s", objDesc))); } diff --git a/src/spock_sync.c b/src/spock_sync.c index 50d16971..41ce83d9 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -2852,7 +2852,7 @@ exec_cmd_win32(const char *cmd, char *cmdargv[]) { char *winerr = PglGetLastWin32Error(); - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errcode_for_file_access(), errmsg("unexpected WaitForSingleObject() return code %d while waiting for child process \"%s\": %s", ret, cmd, winerr))); @@ -2864,7 +2864,7 @@ exec_cmd_win32(const char *cmd, char *cmdargv[]) { char *winerr = PglGetLastWin32Error(); - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errcode_for_file_access(), errmsg("failed to get exit code from process \"%s\": %s", cmd, winerr))); @@ -2883,7 +2883,7 @@ exec_cmd_win32(const char *cmd, char *cmdargv[]) * Process must've exited, so code is a value from * ExitProcess, TerminateProcess, main or WinMain. */ - ereport(DEBUG1, + ereport(SPOCK_DEBUG1, (errmsg("process \"%s\" exited with code %d", cmd, exitcode))); diff --git a/tests/tap/schedule b/tests/tap/schedule index a7894dea..619c8497 100644 --- a/tests/tap/schedule +++ b/tests/tap/schedule @@ -44,3 +44,4 @@ test: 018_forward_origins test: 018_failover_slots test: 019_stale_fd_epoll_after_conn_death test: 022_rmgr_progress_post_checkpoint_crash +test: 044_apply_change_logging diff --git a/tests/tap/t/044_apply_change_logging.pl b/tests/tap/t/044_apply_change_logging.pl new file mode 100644 index 00000000..f7208c2b --- /dev/null +++ b/tests/tap/t/044_apply_change_logging.pl @@ -0,0 +1,264 @@ +#!/usr/bin/perl +# +# ============================================================================= +# Test: 044_apply_change_logging.pl - spock.log_verbosity / apply_change_logging +# ============================================================================= +# Verifies the two GUCs added for SPOC-551: +# +# spock.log_verbosity = normal | verbose +# When 'verbose', spock's own DEBUG1/DEBUG2 messages are promoted to +# LOG so they appear in the standard server log without changing +# log_min_messages globally. +# +# spock.apply_change_logging = none | key_only | verbose +# Controls JSON change logging by the apply worker: +# none - no extra output (default) +# key_only - log {action, schema, table, pk, origin, commit_ts} +# verbose - additionally log old/new row data +# DDL is logged in both 'key_only' and 'verbose' modes. +# +# Test coverage: +# 1. Both GUCs are present and have the expected defaults / accepted values. +# 2. They are PGC_SUSET (settable by superuser via SET, reloaded via SIGHUP). +# 3. apply_change_logging = key_only - an INSERT produces a JSON record on +# the subscriber's server log with action/schema/table/pk/origin. +# 4. apply_change_logging = verbose - an UPDATE produces a JSON record that +# also contains the "new" row data. +# 5. apply_change_logging = none - subsequent inserts produce NO JSON log +# records (verifies the no-op default path). +# 6. DDL via replicate_ddl() is logged with action=DDL in key_only mode. +# + +use strict; +use warnings; +use Test::More; +use lib '.'; +use SpockTest qw( + create_cluster destroy_cluster system_or_bail + get_test_config scalar_query psql_or_bail wait_for_sub_status +); + +# ------------------------------------------------------------------------- +# Cluster setup +# ------------------------------------------------------------------------- +create_cluster(2, 'Create 2-node cluster for apply-change-logging test'); + +my $config = get_test_config(); +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $user = $config->{db_user}; +my $pw = $config->{db_password}; +my $ports = $config->{node_ports}; +my $datadirs = $config->{node_datadirs}; + +# postgresql.conf in SpockTest.pm sets log_directory='logs' (relative), so +# the actual server log on the subscriber lives under its data directory. +my $sub_log = "$datadirs->[1]/logs/00" . $ports->[1] . ".log"; + +my $conn = "host=$host dbname=$dbname port=$ports->[0] user=$user password=$pw"; +psql_or_bail(2, + "SELECT spock.sub_create('acl_sub', '$conn', " + . "ARRAY['default','default_insert_only','ddl_sql'], true, true)"); +ok(wait_for_sub_status(2, 'acl_sub', 'replicating', 60), + 'subscription replicating'); + +# ------------------------------------------------------------------------- +# 1. GUCs exist with correct default values +# ------------------------------------------------------------------------- +is(scalar_query(2, "SHOW spock.log_verbosity"), 'normal', + 'spock.log_verbosity defaults to normal'); +is(scalar_query(2, "SHOW spock.apply_change_logging"), 'none', + 'spock.apply_change_logging defaults to none'); + +# ------------------------------------------------------------------------- +# 2. Both GUCs are PGC_SUSET - settable by superuser via SET +# ------------------------------------------------------------------------- +psql_or_bail(2, "SET spock.log_verbosity = 'verbose'"); +psql_or_bail(2, "SET spock.apply_change_logging = 'key_only'"); +psql_or_bail(2, "RESET spock.log_verbosity"); +psql_or_bail(2, "RESET spock.apply_change_logging"); + +# ------------------------------------------------------------------------- +# 3. apply_change_logging = key_only: INSERT generates a JSON record +# ------------------------------------------------------------------------- +# With spock.enable_ddl_replication+include_ddl_repset on (the SpockTest +# default), CREATE TABLE is auto-added to the 'default' repset and the +# DDL is replicated to n2, so no explicit repset_add_table is needed. +psql_or_bail(1, "CREATE TABLE acl_tbl (id int PRIMARY KEY, name text)"); +system_or_bail 'sleep', '4'; + +# Flip the GUC on the subscriber via ALTER SYSTEM (so the apply worker +# picks it up after pg_reload_conf). PGC_SUSET also accepts SET, but the +# apply worker is a separate backend - ALTER SYSTEM + reload is the way +# the GUC reaches it. +psql_or_bail(2, "ALTER SYSTEM SET spock.apply_change_logging = 'key_only'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +system_or_bail 'sleep', '2'; + +# Mark the current end of the log so we can scan only what came after. +my $log_size_before_insert = -s $sub_log; +$log_size_before_insert //= 0; + +psql_or_bail(1, "INSERT INTO acl_tbl VALUES (42, 'alice')"); + +# Wait for the row to land on n2 (and for the apply worker to emit the log). +my $cnt; +for (1 .. 30) { + $cnt = scalar_query(2, "SELECT count(*) FROM acl_tbl WHERE id = 42"); + last if defined $cnt && $cnt eq '1'; + sleep(1); +} +is($cnt, '1', 'INSERT replicated to n2'); + +# Allow the apply worker's logger a moment to flush. +sleep(2); + +# Slurp everything after the marker and look for the JSON record. +my $after_insert = read_tail($sub_log, $log_size_before_insert); +my $key_only_line = find_apply_change_line($after_insert, '"action":"INSERT"'); +ok($key_only_line, + 'key_only: server log contains a "spock apply change" INSERT JSON record'); + +SKIP: { + skip 'no INSERT JSON line found', 4 unless $key_only_line; + like($key_only_line, qr/"schema":"public"/, '... has schema=public'); + like($key_only_line, qr/"table":"acl_tbl"/, '... has table=acl_tbl'); + like($key_only_line, qr/"pk":\{[^}]*"id":"42"/, '... has pk.id=42'); + # key_only mode must NOT contain "new":{} row data + unlike($key_only_line, qr/"new":\{/, + '... key_only mode omits the "new" row payload'); +} + +# ------------------------------------------------------------------------- +# 4. apply_change_logging = verbose: UPDATE produces a record with "new" +# ------------------------------------------------------------------------- +psql_or_bail(2, "ALTER SYSTEM SET spock.apply_change_logging = 'verbose'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +system_or_bail 'sleep', '2'; + +my $log_size_before_update = -s $sub_log; + +psql_or_bail(1, "UPDATE acl_tbl SET name = 'alice_v2' WHERE id = 42"); + +# Wait for the new value to land on n2. +my $name; +for (1 .. 30) { + $name = scalar_query(2, + "SELECT name FROM acl_tbl WHERE id = 42"); + last if defined $name && $name eq 'alice_v2'; + sleep(1); +} +is($name, 'alice_v2', 'UPDATE replicated to n2'); +sleep(2); + +my $after_update = read_tail($sub_log, $log_size_before_update); +my $verbose_line = find_apply_change_line($after_update, '"action":"UPDATE"'); +ok($verbose_line, + 'verbose: server log contains a "spock apply change" UPDATE JSON record'); + +SKIP: { + skip 'no UPDATE JSON line found', 2 unless $verbose_line; + like($verbose_line, qr/"new":\{[^}]*"name":"alice_v2"/, + '... verbose mode includes the "new" row payload'); + like($verbose_line, qr/"pk":\{[^}]*"id":"42"/, + '... verbose mode still includes pk'); +} + +# ------------------------------------------------------------------------- +# 5. DDL is logged in key_only mode (replicated via replicate_ddl path) +# ------------------------------------------------------------------------- +psql_or_bail(2, "ALTER SYSTEM SET spock.apply_change_logging = 'key_only'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +system_or_bail 'sleep', '2'; + +my $log_size_before_ddl = -s $sub_log; + +# Use spock.replicate_ddl so the DDL flows through the queue / handle_sql path. +psql_or_bail(1, + "SELECT spock.replicate_ddl(" + . " \$ddl\$ ALTER TABLE public.acl_tbl ADD COLUMN created_at timestamptz \$ddl\$, " + . " ARRAY['ddl_sql'])"); + +# Wait for the DDL to apply on n2 (column visible). +my $col; +for (1 .. 30) { + $col = scalar_query(2, + "SELECT count(*) FROM information_schema.columns " + . "WHERE table_name='acl_tbl' AND column_name='created_at'"); + last if defined $col && $col eq '1'; + sleep(1); +} +is($col, '1', 'DDL replicated to n2'); +sleep(2); + +my $after_ddl = read_tail($sub_log, $log_size_before_ddl); +my $ddl_line = find_apply_change_line($after_ddl, '"action":"DDL"'); +ok($ddl_line, + 'key_only: server log contains a "spock apply change" DDL JSON record'); + +SKIP: { + skip 'no DDL JSON line found', 1 unless $ddl_line; + # The SQL value is JSON-escaped (search_path setup contains \"$user\"), + # so allow anything between "sql": and the ALTER TABLE / created_at + # tokens. + like($ddl_line, qr/"sql":".*ALTER TABLE.*created_at/, + '... DDL record contains the ALTER TABLE SQL text'); +} + +# ------------------------------------------------------------------------- +# 6. apply_change_logging = none: no further "spock apply change" lines +# ------------------------------------------------------------------------- +psql_or_bail(2, "ALTER SYSTEM SET spock.apply_change_logging = 'none'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +system_or_bail 'sleep', '2'; + +my $log_size_before_none = -s $sub_log; + +psql_or_bail(1, "INSERT INTO acl_tbl VALUES (43, 'bob')"); +for (1 .. 30) { + $cnt = scalar_query(2, "SELECT count(*) FROM acl_tbl WHERE id = 43"); + last if defined $cnt && $cnt eq '1'; + sleep(1); +} +is($cnt, '1', 'second INSERT replicated to n2'); +sleep(2); + +my $after_none = read_tail($sub_log, $log_size_before_none); +unlike($after_none, qr/spock apply change/, + 'none: no JSON change-log records emitted'); + +# ------------------------------------------------------------------------- +# Cleanup +# ------------------------------------------------------------------------- +psql_or_bail(2, "ALTER SYSTEM RESET spock.apply_change_logging"); +psql_or_bail(2, "ALTER SYSTEM RESET spock.log_verbosity"); +psql_or_bail(2, "SELECT pg_reload_conf()"); + +psql_or_bail(2, "SELECT spock.sub_drop('acl_sub')"); +psql_or_bail(1, "DROP TABLE IF EXISTS acl_tbl CASCADE"); +destroy_cluster('Destroy apply-change-logging cluster'); + +done_testing(); + +# ------------------------------------------------------------------------- +# Helpers +# ------------------------------------------------------------------------- +sub read_tail { + my ($path, $offset) = @_; + return '' unless -f $path; + open(my $fh, '<', $path) or return ''; + seek($fh, $offset // 0, 0); + local $/; + my $data = <$fh>; + close($fh); + return defined $data ? $data : ''; +} + +sub find_apply_change_line { + my ($haystack, $needle) = @_; + for my $line (split /\n/, $haystack) { + next unless $line =~ /spock apply change:/; + return $line if index($line, $needle) >= 0; + } + return undef; +}