Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/spock_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct SpockSyncStatus
extern void spock_sync_worker_finish(void);

extern void spock_sync_subscription(SpockSubscription *sub);
extern bool upstream_version_supports_structure_sync(PGconn *origin_conn);
extern char spock_sync_table(SpockSubscription *sub, RangeVar *table, XLogRecPtr *status_lsn);

extern void create_local_sync_status(SpockSyncStatus *sync);
Expand Down
21 changes: 21 additions & 0 deletions src/spock_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,27 @@ spock_create_subscription(PG_FUNCTION_ARGS)

/* Now, fetch info about remote node. */
conn = spock_connect(provider_dsn, sub_name, "create");

/*
* If we will dump the provider's schema, our pg_dump/pg_restore must be
* able to read it. Reject an incompatible major version now, at CREATE
* SUBSCRIPTION, rather than failing later in the sync worker.
*/
if (sync_structure && !upstream_version_supports_structure_sync(conn))
{
const char *v = PQparameterStatus(conn, "server_version");
char *provider_version = pstrdup(v ? v : "unknown");

PQfinish(conn);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot synchronize structure from provider running PostgreSQL %s",
provider_version),
errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.",
PG_VERSION),
errhint("Synchronize the schema separately and create the subscription with synchronize_structure = false, or match the major versions.")));
}

origin = spock_remote_node_info(conn, NULL, NULL, NULL);
PQfinish(conn);

Expand Down
71 changes: 66 additions & 5 deletions src/spock_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,37 @@ restore_structure(SpockSubscription *sub, const char *srcfile,
errhint("check the error log to determine the cause of the issue")));
}

/*
* upstream_version_supports_structure_sync
* True if our pg_dump/pg_restore can read this upstream.
*
* The subscriber's pg_dump (its own, per get_pg_executable()) cannot read a
* provider whose server version falls outside the inclusive
* [minRemoteVersion, maxRemoteVersion] window that pg_dump itself enforces in
* _check_database_version() (src/bin/pg_dump/pg_backup_db.c). The bounds
* mirror those set in main() of src/bin/pg_dump/pg_dump.c.
*/
bool
upstream_version_supports_structure_sync(PGconn *origin_conn)
{
int remoteversion;
const int minRemoteVersion = 90200;
const int maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99;

Assert(origin_conn != NULL && PQstatus(origin_conn) == CONNECTION_OK);

remoteversion = PQserverVersion(origin_conn);

/*
* Treat an unknown version (0 -- a dead connection or a non-PostgreSQL
* endpoint) as unsupported, and reject upstreams below minRemoteVersion
* or above maxRemoteVersion, matching pg_dump's own bounds check.
*/
return remoteversion != 0
&& remoteversion >= minRemoteVersion
&& remoteversion <= maxRemoteVersion;
}
Comment thread
danolivo marked this conversation as resolved.

/*
* Create slot and get the exported snapshot.
*
Expand Down Expand Up @@ -359,9 +390,8 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn,
* sync_replication_slots = on. PG17+ uses parenthesised option syntax:
* CREATE_REPLICATION_SLOT "name" LOGICAL plugin (FAILOVER)
*
* We key off the regular SQL connection (sql_conn) for version detection.
* Replication protocol connections (repl_conn) return 0 from PQserverVersion()
* so they cannot be used for this check.
* Detect the version on the SQL connection (sql_conn), which we hold open
* anyway for the slot-reclaim queries below.
*/
if (PQserverVersion(sql_conn) >= 170000)
appendStringInfo(&query, " (FAILOVER)");
Expand Down Expand Up @@ -616,8 +646,7 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn,
* consistent with the slot's WAL position — the correct snapshot for COPY.
*
* Mark the slot with (FAILOVER) when the remote provider is PG17+.
* Use the regular SQL connection (conn) for version detection — replication
* protocol connections (repl_conn) return 0 from PQserverVersion().
* Detect the version on the SQL connection (conn), held open anyway.
*/
appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s",
slot_name, "spock_output");
Expand Down Expand Up @@ -1450,6 +1479,38 @@ spock_sync_subscription(SpockSubscription *sub)

origin_conn = spock_connect(sub->origin_if->dsn,
sub->name, "snap");

/*
* An incompatible upstream is permanent, so disable the subscription
* rather than let the worker crash-loop on every restart. Checked
* before the replication connection and slot are created. No
* transaction is open (the switch above committed its own), so
* spock_disable_subscription() commits its own and the disable survives
* the FATAL; origin_conn is left open to keep remoteversion_str valid.
*/
if (SyncKindStructure(sync->kind) &&
!upstream_version_supports_structure_sync(origin_conn))
{
const char *remoteversion_str = PQparameterStatus(origin_conn,
"server_version");

Assert(!IsTransactionState());

exception_behaviour = SUB_DISABLE;
spock_disable_subscription(sub, InvalidRepOriginId,
InvalidTransactionId,
InvalidXLogRecPtr, 0);

ereport(FATAL,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SPOCK %s: cannot synchronize structure from upstream server version %s",
sub->name,
remoteversion_str ? remoteversion_str : "unknown"),
errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.",
PG_VERSION),
errhint("Synchronize the schema another way and recreate the subscription with synchronize_structure = false, then re-enable the subscription.")));
}

origin_conn_repl = spock_connect_replica(sub->origin_if->dsn,
sub->name, "snap");

Expand Down
Loading