Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/spock.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern int restart_delay_on_exception;
extern int spock_replay_queue_size;
extern int spock_pause_timeout;
extern int spock_read_retry_count;
extern int spock_read_retry_wait_ms;
extern bool check_all_uc_indexes;
extern bool spock_enable_quiet_mode;
extern int log_origin_change;
Expand Down
1 change: 1 addition & 0 deletions include/spock_apply.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ typedef void (*spock_apply_mi_finish_fn) (SpockRelation *rel);
extern int my_exception_log_index;

extern void wait_for_previous_transaction(void);
extern bool wait_for_previous_transaction_timeout(int timeout_ms);
extern void awake_transaction_waiters(void);

#endif /* SPOCK_APPLY_H */
38 changes: 30 additions & 8 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ int restart_delay_default;
int restart_delay_on_exception;
int spock_replay_queue_size;
int spock_pause_timeout = 10; /* seconds to wait for apply workers to pause */
int spock_read_retry_count = 5; /* heap update/delete: retries when local tuple is missing */
int spock_read_retry_count = 5; /* heap update/delete phase-1 tight retries */
int spock_read_retry_wait_ms = 100; /* heap update/delete phase-2 predecessor wait cap (ms) */
bool check_all_uc_indexes = false;
bool spock_enable_quiet_mode = false;
int log_origin_change = SPOCK_ORIGIN_NONE;
Expand Down Expand Up @@ -1204,13 +1205,13 @@ _PG_init(void)
NULL);

DefineCustomIntVariable("spock.read_retry_count",
"Number of times the apply worker re-reads the local "
"relation when a row targeted by a remote UPDATE or "
"DELETE is not yet visible",
"On each retry the apply worker waits for any "
"concurrently-applying transaction to finish, then "
"searches the local relation again. Set to 0 to disable "
"retries (the row-missing path runs immediately).",
"Phase-1 tight retry count for the heap apply path",
"When a row targeted by a remote UPDATE or DELETE is "
"not yet visible locally, the apply worker first "
"re-reads the local relation up to this many times "
"back-to-back, with no waits. Addresses a brief "
"local visibility window under extreme load. "
"Set to 0 to skip phase 1.",
&spock_read_retry_count,
5,
0,
Expand All @@ -1221,6 +1222,27 @@ _PG_init(void)
NULL,
NULL);

DefineCustomIntVariable("spock.read_retry_wait_ms",
"Phase-2 bounded predecessor wait for the heap apply path",
"If the local row is still missing after the phase-1 "
"tight retries, the apply worker waits up to this many "
"milliseconds for the immediate predecessor transaction "
"to commit (event-driven via condition variable), then "
"re-checks the relation once. Addresses the case where "
"the row's insert is still in a predecessor transaction "
"that has not yet committed. "
"Set to 0 to skip phase 2 (the loop returns to the "
"row-missing conflict path immediately after phase 1).",
&spock_read_retry_wait_ms,
100,
0,
60000,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL,
NULL,
NULL);

DefineCustomIntVariable("spock.exception_replay_queue_size",
"Maximum in-memory size for the apply replay queue",
"When the replay queue exceeds this size, subsequent "
Expand Down
39 changes: 30 additions & 9 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,22 @@ static void apply_replay_spill_write_entry(int len, char *data);
static ApplyReplayEntry *apply_replay_spill_read_entry(void);
static void request_initial_status_update(PGconn *conn, XLogRecPtr startpos);

/* Wrapper for latch for waiting for previous transaction to commit */
void
wait_for_previous_transaction(void)
/*
* Wait for the immediate predecessor transaction to commit, up to
* timeout_ms milliseconds. Pass 0 to wait forever.
*
* Returns true if the predecessor commit-ts was observed, false if the
* timeout elapsed first.
*/
bool
wait_for_previous_transaction_timeout(int timeout_ms)
{
/*
* Sleep on a cv to be woken up once our the required predecessor has
* commited.
*/
TimestampTz deadline = 0;

if (timeout_ms > 0)
deadline = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
timeout_ms);

for (;;)
{
/*
Expand All @@ -274,7 +282,14 @@ wait_for_previous_transaction(void)
if (apply_worker_get_prev_remote_ts() == required_commit_ts ||
required_commit_ts == 0)
{
break;
ConditionVariableCancelSleep();
return true;
}

if (deadline != 0 && GetCurrentTimestamp() >= deadline)
{
ConditionVariableCancelSleep();
return false;
}

CHECK_FOR_INTERRUPTS();
Expand All @@ -300,7 +315,13 @@ wait_for_previous_transaction(void)
1,
WAIT_EVENT_LOGICAL_APPLY_MAIN);
}
ConditionVariableCancelSleep();
}

/* Wrapper for latch for waiting for previous transaction to commit */
void
wait_for_previous_transaction(void)
{
(void) wait_for_previous_transaction_timeout(0);
}

/* Wrapper to wake up all waiters for previous transaction to commit */
Expand Down
49 changes: 37 additions & 12 deletions src/spock_apply_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,19 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
relinfo = edata->targetRelInfo;
idxused = edata->targetRel->idxoid;

/*
* Two races can cause FindReplTupleInLocalRel to miss a row that should
* be present. Address each with its own primitive:
*
* Phase 1: a brief local visibility window under extreme load
* occasionally hides a row that has already been inserted. Resolve
* with a tight retry, no wait.
*
* Phase 2: the row's insert may still be in a predecessor
* transaction that has not yet committed. Wait once, bounded by
* spock.read_retry_wait_ms, on the apply-group condition variable;
* re-check once after the wake-up.
*/
retry = 0;
while (retry < spock_read_retry_count)
{
Expand All @@ -1000,17 +1013,22 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
if (found)
break;

/*
* We didn't find the local tuple. Let's wait here so that any
* impending insert can be processed.
*/
wait_for_previous_transaction();
retry++;
}

if (!found && spock_read_retry_wait_ms > 0)
{
(void) wait_for_previous_transaction_timeout(spock_read_retry_wait_ms);
found = FindReplTupleInLocalRel(edata, relinfo->ri_RelationDesc,
edata->targetRel->idxoid,
remoteslot, &localslot,
false);
retry++;
}

if (retry > 0)
elog(LOG, "spock_apply_heap_update() retried %d times", retry);
elog(LOG, "spock_apply_heap_update() retried %d times (found=%s)",
retry, found ? "true" : "false");

/*
* Perform the UPDATE if Tuple found.
Expand Down Expand Up @@ -1107,6 +1125,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)

relinfo = edata->targetRelInfo;

/* See spock_apply_heap_update() above for the two-phase rationale. */
retry = 0;
while (retry < spock_read_retry_count)
{
Expand All @@ -1117,18 +1136,24 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
if (found)
break;

/*
* We didn't find the local tuple. Let's wait here so that any
* impending insert can be processed.
*/
wait_for_previous_transaction();
retry++;
}

if (!found && spock_read_retry_wait_ms > 0)
{
(void) wait_for_previous_transaction_timeout(spock_read_retry_wait_ms);
found = FindReplTupleInLocalRel(edata, relinfo->ri_RelationDesc,
edata->targetRel->idxoid,
remoteslot, &localslot,
false);
retry++;
}

ExecClearTuple(remoteslot);

if (retry > 0)
elog(LOG, "spock_apply_heap_delete() retried %d times", retry);
elog(LOG, "spock_apply_heap_delete() retried %d times (found=%s)",
retry, found ? "true" : "false");

/*
* Perform the DELETE if Tuple found.
Expand Down
115 changes: 106 additions & 9 deletions tests/tap/t/030_read_retry_count_guc.pl
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@
# =============================================================================
# Test: 030_read_retry_count_guc.pl
#
# Verifies the spock.read_retry_count GUC:
# 1. is registered with the expected default (5)
# 2. is read by the apply path on each iteration via SHOW
# 3. accepts ALTER SYSTEM SET + pg_reload_conf() updates at runtime
# 4. rejects values outside the documented [0, 100] range
# Verifies the two GUCs that control the phased heap-apply retry loop in
# spock_apply_heap_update() and spock_apply_heap_delete():
#
# The GUC controls the retry loop in spock_apply_heap_update() and
# spock_apply_heap_delete() — the apply worker re-reads the local
# relation up to spock.read_retry_count times when a row targeted by a
# remote UPDATE/DELETE is not yet visible locally.
# spock.read_retry_count — phase 1: tight retry count, no wait
# (handles the brief visibility window race
# from commit 093b797)
# spock.read_retry_wait_ms — phase 2: bounded wait for predecessor
# transaction commit (handles the
# predecessor-insert race from commit 12653ca)
#
# For each GUC the test confirms:
# 1. correct default value
# 2. expected pg_settings metadata (context, unit, min, max)
# 3. ALTER SYSTEM SET + pg_reload_conf() updates the runtime value
# 4. out-of-range values are rejected; boundary values are accepted
# =============================================================================

create_cluster(2, 'Create 2-node Spock cluster for read_retry_count GUC');
Expand Down Expand Up @@ -123,8 +128,100 @@
. ">/dev/null 2>&1");
is($rc_max, 0, "spock.read_retry_count accepts the upper boundary (100)");

# =============================================================================
# spock.read_retry_wait_ms — phase 2 bounded predecessor wait
# =============================================================================

# Default value is 100 ms
my $wait_default = scalar_query(1, "SHOW spock.read_retry_wait_ms");
$wait_default =~ s/\s+//g;
# SHOW renders the unit; pg_settings.setting is raw. Compare against raw.
my $wait_raw = scalar_query(1,
"SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_raw =~ s/\s+//g;
is($wait_raw, '100',
"spock.read_retry_wait_ms default is 100 ms");

my $wait_context = scalar_query(1,
"SELECT context FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_context =~ s/\s+//g;
is($wait_context, 'sighup',
"spock.read_retry_wait_ms GUC context is PGC_SIGHUP");

my $wait_unit = scalar_query(1,
"SELECT coalesce(unit::text, '') FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_unit =~ s/\s+//g;
is($wait_unit, 'ms',
"spock.read_retry_wait_ms is reported with ms unit");

my $wait_min = scalar_query(1,
"SELECT min_val FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_min =~ s/\s+//g;
is($wait_min, '0',
"spock.read_retry_wait_ms min_val is 0 (0 disables phase 2)");

my $wait_max = scalar_query(1,
"SELECT max_val FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_max =~ s/\s+//g;
is($wait_max, '60000',
"spock.read_retry_wait_ms max_val is 60000 (60s)");

# ALTER SYSTEM SET + reload takes effect
psql_or_bail(1, "ALTER SYSTEM SET spock.read_retry_wait_ms = 250");
psql_or_bail(1, "SELECT pg_reload_conf()");
sleep(1);
my $wait_after_set = scalar_query(1,
"SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_after_set =~ s/\s+//g;
is($wait_after_set, '250',
"spock.read_retry_wait_ms picks up new value (250) after ALTER SYSTEM + reload");

psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_wait_ms");
psql_or_bail(1, "SELECT pg_reload_conf()");
sleep(1);
my $wait_after_reset = scalar_query(1,
"SELECT setting FROM pg_settings WHERE name = 'spock.read_retry_wait_ms'");
$wait_after_reset =~ s/\s+//g;
is($wait_after_reset, '100',
"spock.read_retry_wait_ms returns to default (100) after ALTER SYSTEM RESET");

# Out-of-range rejected
my $rc_neg_ms = system(
"$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user "
. "-v ON_ERROR_STOP=1 "
. "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = -1\" "
. ">/dev/null 2>&1");
isnt($rc_neg_ms, 0,
"spock.read_retry_wait_ms rejects value below 0 (-1)");

my $rc_hi_ms = system(
"$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user "
. "-v ON_ERROR_STOP=1 "
. "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 60001\" "
. ">/dev/null 2>&1");
isnt($rc_hi_ms, 0,
"spock.read_retry_wait_ms rejects value above 60000 (60001)");

# Boundary values must succeed
my $rc_zero_ms = system(
"$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user "
. "-v ON_ERROR_STOP=1 "
. "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 0\" "
. ">/dev/null 2>&1");
is($rc_zero_ms, 0,
"spock.read_retry_wait_ms accepts the lower boundary (0 = phase 2 disabled)");

my $rc_max_ms = system(
"$pg_bin/psql -X -h $host -p $primary_port -d $dbname -U $db_user "
. "-v ON_ERROR_STOP=1 "
. "-c \"ALTER SYSTEM SET spock.read_retry_wait_ms = 60000\" "
. ">/dev/null 2>&1");
is($rc_max_ms, 0,
"spock.read_retry_wait_ms accepts the upper boundary (60000)");

# Cleanup so the destroy_cluster restart leaves no residue
psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_count");
psql_or_bail(1, "ALTER SYSTEM RESET spock.read_retry_wait_ms");

destroy_cluster('Destroy test cluster');
done_testing();
Loading