diff --git a/include/spock.h b/include/spock.h index 98997341..1d6462f5 100644 --- a/include/spock.h +++ b/include/spock.h @@ -51,6 +51,7 @@ extern int restart_delay_on_exception; extern int spock_replay_queue_size; extern int spock_pause_timeout; extern int spock_read_retry_count; +extern int spock_read_retry_wait_ms; extern bool check_all_uc_indexes; extern bool spock_enable_quiet_mode; extern int log_origin_change; diff --git a/include/spock_apply.h b/include/spock_apply.h index 913ae065..43af6ce6 100644 --- a/include/spock_apply.h +++ b/include/spock_apply.h @@ -37,6 +37,7 @@ typedef void (*spock_apply_mi_finish_fn) (SpockRelation *rel); extern int my_exception_log_index; extern void wait_for_previous_transaction(void); +extern bool wait_for_previous_transaction_timeout(int timeout_ms); extern void awake_transaction_waiters(void); #endif /* SPOCK_APPLY_H */ diff --git a/src/spock.c b/src/spock.c index d1696d8f..60352689 100644 --- a/src/spock.c +++ b/src/spock.c @@ -152,7 +152,8 @@ int restart_delay_default; int restart_delay_on_exception; int spock_replay_queue_size; int spock_pause_timeout = 10; /* seconds to wait for apply workers to pause */ -int spock_read_retry_count = 5; /* heap update/delete: retries when local tuple is missing */ +int spock_read_retry_count = 5; /* heap update/delete phase-1 tight retries */ +int spock_read_retry_wait_ms = 100; /* heap update/delete phase-2 predecessor wait cap (ms) */ bool check_all_uc_indexes = false; bool spock_enable_quiet_mode = false; int log_origin_change = SPOCK_ORIGIN_NONE; @@ -1204,13 +1205,13 @@ _PG_init(void) NULL); DefineCustomIntVariable("spock.read_retry_count", - "Number of times the apply worker re-reads the local " - "relation when a row targeted by a remote UPDATE or " - "DELETE is not yet visible", - "On each retry the apply worker waits for any " - "concurrently-applying transaction to finish, then " - "searches the local relation again. Set to 0 to disable " - "retries (the row-missing path runs immediately).", + "Phase-1 tight retry count for the heap apply path", + "When a row targeted by a remote UPDATE or DELETE is " + "not yet visible locally, the apply worker first " + "re-reads the local relation up to this many times " + "back-to-back, with no waits. Addresses a brief " + "local visibility window under extreme load. " + "Set to 0 to skip phase 1.", &spock_read_retry_count, 5, 0, @@ -1221,6 +1222,27 @@ _PG_init(void) NULL, NULL); + DefineCustomIntVariable("spock.read_retry_wait_ms", + "Phase-2 bounded predecessor wait for the heap apply path", + "If the local row is still missing after the phase-1 " + "tight retries, the apply worker waits up to this many " + "milliseconds for the immediate predecessor transaction " + "to commit (event-driven via condition variable), then " + "re-checks the relation once. Addresses the case where " + "the row's insert is still in a predecessor transaction " + "that has not yet committed. " + "Set to 0 to skip phase 2 (the loop returns to the " + "row-missing conflict path immediately after phase 1).", + &spock_read_retry_wait_ms, + 100, + 0, + 60000, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, + NULL, + NULL); + DefineCustomIntVariable("spock.exception_replay_queue_size", "Maximum in-memory size for the apply replay queue", "When the replay queue exceeds this size, subsequent " diff --git a/src/spock_apply.c b/src/spock_apply.c index 9e41b3d7..367f2c76 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -256,14 +256,22 @@ static void apply_replay_spill_write_entry(int len, char *data); static ApplyReplayEntry *apply_replay_spill_read_entry(void); static void request_initial_status_update(PGconn *conn, XLogRecPtr startpos); -/* Wrapper for latch for waiting for previous transaction to commit */ -void -wait_for_previous_transaction(void) +/* + * Wait for the immediate predecessor transaction to commit, up to + * timeout_ms milliseconds. Pass 0 to wait forever. + * + * Returns true if the predecessor commit-ts was observed, false if the + * timeout elapsed first. + */ +bool +wait_for_previous_transaction_timeout(int timeout_ms) { - /* - * Sleep on a cv to be woken up once our the required predecessor has - * commited. - */ + TimestampTz deadline = 0; + + if (timeout_ms > 0) + deadline = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + timeout_ms); + for (;;) { /* @@ -274,7 +282,14 @@ wait_for_previous_transaction(void) if (apply_worker_get_prev_remote_ts() == required_commit_ts || required_commit_ts == 0) { - break; + ConditionVariableCancelSleep(); + return true; + } + + if (deadline != 0 && GetCurrentTimestamp() >= deadline) + { + ConditionVariableCancelSleep(); + return false; } CHECK_FOR_INTERRUPTS(); @@ -300,7 +315,13 @@ wait_for_previous_transaction(void) 1, WAIT_EVENT_LOGICAL_APPLY_MAIN); } - ConditionVariableCancelSleep(); +} + +/* Wrapper for latch for waiting for previous transaction to commit */ +void +wait_for_previous_transaction(void) +{ + (void) wait_for_previous_transaction_timeout(0); } /* Wrapper to wake up all waiters for previous transaction to commit */ diff --git a/src/spock_apply_heap.c b/src/spock_apply_heap.c index 1b492e50..24dcef1a 100644 --- a/src/spock_apply_heap.c +++ b/src/spock_apply_heap.c @@ -990,6 +990,19 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, relinfo = edata->targetRelInfo; idxused = edata->targetRel->idxoid; + /* + * Two races can cause FindReplTupleInLocalRel to miss a row that should + * be present. Address each with its own primitive: + * + * Phase 1: a brief local visibility window under extreme load + * occasionally hides a row that has already been inserted. Resolve + * with a tight retry, no wait. + * + * Phase 2: the row's insert may still be in a predecessor + * transaction that has not yet committed. Wait once, bounded by + * spock.read_retry_wait_ms, on the apply-group condition variable; + * re-check once after the wake-up. + */ retry = 0; while (retry < spock_read_retry_count) { @@ -1000,17 +1013,22 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup, if (found) break; - /* - * We didn't find the local tuple. Let's wait here so that any - * impending insert can be processed. - */ - wait_for_previous_transaction(); + retry++; + } + if (!found && spock_read_retry_wait_ms > 0) + { + (void) wait_for_previous_transaction_timeout(spock_read_retry_wait_ms); + found = FindReplTupleInLocalRel(edata, relinfo->ri_RelationDesc, + edata->targetRel->idxoid, + remoteslot, &localslot, + false); retry++; } if (retry > 0) - elog(LOG, "spock_apply_heap_update() retried %d times", retry); + elog(LOG, "spock_apply_heap_update() retried %d times (found=%s)", + retry, found ? "true" : "false"); /* * Perform the UPDATE if Tuple found. @@ -1107,6 +1125,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup) relinfo = edata->targetRelInfo; + /* See spock_apply_heap_update() above for the two-phase rationale. */ retry = 0; while (retry < spock_read_retry_count) { @@ -1117,18 +1136,24 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup) if (found) break; - /* - * We didn't find the local tuple. Let's wait here so that any - * impending insert can be processed. - */ - wait_for_previous_transaction(); + retry++; + } + if (!found && spock_read_retry_wait_ms > 0) + { + (void) wait_for_previous_transaction_timeout(spock_read_retry_wait_ms); + found = FindReplTupleInLocalRel(edata, relinfo->ri_RelationDesc, + edata->targetRel->idxoid, + remoteslot, &localslot, + false); retry++; } + ExecClearTuple(remoteslot); if (retry > 0) - elog(LOG, "spock_apply_heap_delete() retried %d times", retry); + elog(LOG, "spock_apply_heap_delete() retried %d times (found=%s)", + retry, found ? "true" : "false"); /* * Perform the DELETE if Tuple found. diff --git a/tests/tap/t/030_read_retry_count_guc.pl b/tests/tap/t/030_read_retry_count_guc.pl index 5e0123a2..c42b985a 100644 --- a/tests/tap/t/030_read_retry_count_guc.pl +++ b/tests/tap/t/030_read_retry_count_guc.pl @@ -12,16 +12,21 @@ # ============================================================================= # Test: 030_read_retry_count_guc.pl # -# Verifies the spock.read_retry_count GUC: -# 1. is registered with the expected default (5) -# 2. is read by the apply path on each iteration via SHOW -# 3. accepts ALTER SYSTEM SET + pg_reload_conf() updates at runtime -# 4. rejects values outside the documented [0, 100] range +# Verifies the two GUCs that control the phased heap-apply retry loop in +# spock_apply_heap_update() and spock_apply_heap_delete(): # -# The GUC controls the retry loop in spock_apply_heap_update() and -# spock_apply_heap_delete() — the apply worker re-reads the local -# relation up to spock.read_retry_count times when a row targeted by a -# remote UPDATE/DELETE is not yet visible locally. +# spock.read_retry_count — phase 1: tight retry count, no wait +# (handles the brief visibility window race +# from commit 093b797) +# spock.read_retry_wait_ms — phase 2: bounded wait for predecessor +# transaction commit (handles the +# predecessor-insert race from commit 12653ca) +# +# For each GUC the test confirms: +# 1. correct default value +# 2. expected pg_settings metadata (context, unit, min, max) +# 3. ALTER SYSTEM SET + pg_reload_conf() updates the runtime value +# 4. out-of-range values are rejected; boundary values are accepted # ============================================================================= create_cluster(2, 'Create 2-node Spock cluster for read_retry_count GUC'); @@ -123,8 +128,100 @@ . ">/dev/null 2>&1"); is($rc_max, 0, "spock.read_retry_count accepts the upper boundary (100)"); +# ============================================================================= +# spock.read_retry_wait_ms — phase 2 bounded predecessor wait +# ============================================================================= + +# Default value is 100 ms +my $wait_default = scalar_query(1, "SHOW spock.read_retry_wait_ms"); +$wait_default =~ s/\s+//g; +# SHOW renders the unit; pg_settings.setting is raw. Compare against raw. +my $wait_raw = scalar_query(1, + "SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_raw =~ s/\s+//g; +is($wait_raw, '100', + "spock.read_retry_wait_ms default is 100 ms"); + +my $wait_context = scalar_query(1, + "SELECT context FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_context =~ s/\s+//g; +is($wait_context, 'sighup', + "spock.read_retry_wait_ms GUC context is PGC_SIGHUP"); + +my $wait_unit = scalar_query(1, + "SELECT coalesce(unit::text, '') FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_unit =~ s/\s+//g; +is($wait_unit, 'ms', + "spock.read_retry_wait_ms is reported with ms unit"); + +my $wait_min = scalar_query(1, + "SELECT min_val FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_min =~ s/\s+//g; +is($wait_min, '0', + "spock.read_retry_wait_ms min_val is 0 (0 disables phase 2)"); + +my $wait_max = scalar_query(1, + "SELECT max_val FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_max =~ s/\s+//g; +is($wait_max, '60000', + "spock.read_retry_wait_ms max_val is 60000 (60s)"); + +# ALTER SYSTEM SET + reload takes effect +psql_or_bail(1, "ALTER SYSTEM SET spock.read_retry_wait_ms = 250"); +psql_or_bail(1, "SELECT pg_reload_conf()"); +sleep(1); +my $wait_after_set = scalar_query(1, + "SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_after_set =~ s/\s+//g; +is($wait_after_set, '250', + "spock.read_retry_wait_ms picks up new value (250) after ALTER SYSTEM + reload"); + +psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_wait_ms"); +psql_or_bail(1, "SELECT pg_reload_conf()"); +sleep(1); +my $wait_after_reset = scalar_query(1, + "SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'"); +$wait_after_reset =~ s/\s+//g; +is($wait_after_reset, '100', + "spock.read_retry_wait_ms returns to default (100) after ALTER SYSTEM RESET"); + +# Out-of-range rejected +my $rc_neg_ms = system( + "$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user " + . "-v ON_ERROR_STOP=1 " + . "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = -1\" " + . ">/dev/null 2>&1"); +isnt($rc_neg_ms, 0, + "spock.read_retry_wait_ms rejects value below 0 (-1)"); + +my $rc_hi_ms = system( + "$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user " + . "-v ON_ERROR_STOP=1 " + . "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 60001\" " + . ">/dev/null 2>&1"); +isnt($rc_hi_ms, 0, + "spock.read_retry_wait_ms rejects value above 60000 (60001)"); + +# Boundary values must succeed +my $rc_zero_ms = system( + "$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user " + . "-v ON_ERROR_STOP=1 " + . "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 0\" " + . ">/dev/null 2>&1"); +is($rc_zero_ms, 0, + "spock.read_retry_wait_ms accepts the lower boundary (0 = phase 2 disabled)"); + +my $rc_max_ms = system( + "$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user " + . "-v ON_ERROR_STOP=1 " + . "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 60000\" " + . ">/dev/null 2>&1"); +is($rc_max_ms, 0, + "spock.read_retry_wait_ms accepts the upper boundary (60000)"); + # Cleanup so the destroy_cluster restart leaves no residue psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_count"); +psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_wait_ms"); destroy_cluster('Destroy test cluster'); done_testing();