Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/spock_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ typedef struct SpockSyncStatus
extern void spock_sync_worker_finish(void);

extern void spock_sync_subscription(SpockSubscription *sub);
extern bool upstream_version_supports_structure_sync(PGconn *origin_conn);
extern char spock_sync_table(SpockSubscription *sub, RangeVar *table, XLogRecPtr *status_lsn);

extern void create_local_sync_status(SpockSyncStatus *sync);
Expand Down
21 changes: 21 additions & 0 deletions src/spock_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,27 @@ spock_create_subscription(PG_FUNCTION_ARGS)

/* Now, fetch info about remote node. */
conn = spock_connect(provider_dsn, sub_name, "create");

/*
* If we will dump the provider's schema, our pg_dump/pg_restore must be
* able to read it. Reject an incompatible major version now, at CREATE
* SUBSCRIPTION, rather than failing later in the sync worker.
*/
if (sync_structure && !upstream_version_supports_structure_sync(conn))
{
const char *v = PQparameterStatus(conn, "server_version");
char *provider_version = pstrdup(v ? v : "unknown");

PQfinish(conn);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot synchronize structure from provider running PostgreSQL %s",
provider_version),
errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.",
PG_VERSION),
errhint("Synchronize the schema separately and create the subscription with synchronize_structure = false, or match the major versions.")));
}

origin = spock_remote_node_info(conn, NULL, NULL, NULL);
PQfinish(conn);

Expand Down
65 changes: 60 additions & 5 deletions src/spock_sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,31 @@ restore_structure(SpockSubscription *sub, const char *srcfile,
errhint("check the error log to determine the cause of the issue")));
}

/*
* upstream_version_supports_structure_sync
* True if our pg_dump/pg_restore can read this upstream.
*
* The subscriber's pg_dump (its own, per get_pg_executable()) cannot read a
* provider of a newer major version than itself; cf. _check_database_version()
* in src/bin/pg_dump/connectdb.c.
*/
bool
upstream_version_supports_structure_sync(PGconn *origin_conn)
{
int remoteversion;

Assert(origin_conn != NULL && PQstatus(origin_conn) == CONNECTION_OK);

remoteversion = PQserverVersion(origin_conn);

/*
* Treat an unknown version (0 -- a dead connection or a non-PostgreSQL
* endpoint) as unsupported, and reject a newer major version that our
* pg_dump could not read.
*/
return remoteversion != 0 && remoteversion / 100 <= PG_VERSION_NUM / 100;
}
Comment thread
danolivo marked this conversation as resolved.

/*
* Create slot and get the exported snapshot.
*
Expand Down Expand Up @@ -359,9 +384,8 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn,
* sync_replication_slots = on. PG17+ uses parenthesised option syntax:
* CREATE_REPLICATION_SLOT "name" LOGICAL plugin (FAILOVER)
*
* We key off the regular SQL connection (sql_conn) for version detection.
* Replication protocol connections (repl_conn) return 0 from PQserverVersion()
* so they cannot be used for this check.
* Detect the version on the SQL connection (sql_conn), which we hold open
* anyway for the slot-reclaim queries below.
*/
if (PQserverVersion(sql_conn) >= 170000)
appendStringInfo(&query, " (FAILOVER)");
Expand Down Expand Up @@ -616,8 +640,7 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn,
* consistent with the slot's WAL position — the correct snapshot for COPY.
*
* Mark the slot with (FAILOVER) when the remote provider is PG17+.
* Use the regular SQL connection (conn) for version detection — replication
* protocol connections (repl_conn) return 0 from PQserverVersion().
* Detect the version on the SQL connection (conn), held open anyway.
*/
appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s",
slot_name, "spock_output");
Expand Down Expand Up @@ -1450,6 +1473,38 @@ spock_sync_subscription(SpockSubscription *sub)

origin_conn = spock_connect(sub->origin_if->dsn,
sub->name, "snap");

/*
* An incompatible upstream is permanent, so disable the subscription
* rather than let the worker crash-loop on every restart. Checked
* before the replication connection and slot are created. No
* transaction is open (the switch above committed its own), so
* spock_disable_subscription() commits its own and the disable survives
* the FATAL; origin_conn is left open to keep remoteversion_str valid.
*/
if (SyncKindStructure(sync->kind) &&
!upstream_version_supports_structure_sync(origin_conn))
{
const char *remoteversion_str = PQparameterStatus(origin_conn,
"server_version");

Assert(!IsTransactionState());

exception_behaviour = SUB_DISABLE;
spock_disable_subscription(sub, InvalidRepOriginId,
InvalidTransactionId,
InvalidXLogRecPtr, 0);

ereport(FATAL,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("SPOCK %s: cannot synchronize structure from upstream server version %s",
sub->name,
remoteversion_str ? remoteversion_str : "unknown"),
errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.",
PG_VERSION),
errhint("Synchronize the schema another way and recreate the subscription with synchronize_structure = false, then re-enable the subscription.")));
}

origin_conn_repl = spock_connect_replica(sub->origin_if->dsn,
sub->name, "snap");

Expand Down
Loading