diff --git a/include/spock_sync.h b/include/spock_sync.h index b1cf40e2..5b28eaf3 100644 --- a/include/spock_sync.h +++ b/include/spock_sync.h @@ -66,6 +66,7 @@ typedef struct SpockSyncStatus extern void spock_sync_worker_finish(void); extern void spock_sync_subscription(SpockSubscription *sub); +extern bool upstream_version_supports_structure_sync(PGconn *origin_conn); extern char spock_sync_table(SpockSubscription *sub, RangeVar *table, XLogRecPtr *status_lsn); extern void create_local_sync_status(SpockSyncStatus *sync); diff --git a/src/spock_functions.c b/src/spock_functions.c index b4dea760..58fdd044 100644 --- a/src/spock_functions.c +++ b/src/spock_functions.c @@ -494,6 +494,27 @@ spock_create_subscription(PG_FUNCTION_ARGS) /* Now, fetch info about remote node. */ conn = spock_connect(provider_dsn, sub_name, "create"); + + /* + * If we will dump the provider's schema, our pg_dump/pg_restore must be + * able to read it. Reject an incompatible major version now, at CREATE + * SUBSCRIPTION, rather than failing later in the sync worker. + */ + if (sync_structure && !upstream_version_supports_structure_sync(conn)) + { + const char *v = PQparameterStatus(conn, "server_version"); + char *provider_version = pstrdup(v ? v : "unknown"); + + PQfinish(conn); + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot synchronize structure from provider running PostgreSQL %s", + provider_version), + errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.", + PG_VERSION), + errhint("Synchronize the schema separately and create the subscription with synchronize_structure = false, or match the major versions."))); + } + origin = spock_remote_node_info(conn, NULL, NULL, NULL); PQfinish(conn); diff --git a/src/spock_sync.c b/src/spock_sync.c index 990bb5a1..36e24803 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -330,6 +330,37 @@ restore_structure(SpockSubscription *sub, const char *srcfile, errhint("check the error log to determine the cause of the issue"))); } +/* + * upstream_version_supports_structure_sync + * True if our pg_dump/pg_restore can read this upstream. + * + * The subscriber's pg_dump (its own, per get_pg_executable()) cannot read a + * provider whose server version falls outside the inclusive + * [minRemoteVersion, maxRemoteVersion] window that pg_dump itself enforces in + * _check_database_version() (src/bin/pg_dump/pg_backup_db.c). The bounds + * mirror those set in main() of src/bin/pg_dump/pg_dump.c. + */ +bool +upstream_version_supports_structure_sync(PGconn *origin_conn) +{ + int remoteversion; + const int minRemoteVersion = 90200; + const int maxRemoteVersion = (PG_VERSION_NUM / 100) * 100 + 99; + + Assert(origin_conn != NULL && PQstatus(origin_conn) == CONNECTION_OK); + + remoteversion = PQserverVersion(origin_conn); + + /* + * Treat an unknown version (0 -- a dead connection or a non-PostgreSQL + * endpoint) as unsupported, and reject upstreams below minRemoteVersion + * or above maxRemoteVersion, matching pg_dump's own bounds check. + */ + return remoteversion != 0 + && remoteversion >= minRemoteVersion + && remoteversion <= maxRemoteVersion; +} + /* * Create slot and get the exported snapshot. * @@ -359,9 +390,8 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, * sync_replication_slots = on. PG17+ uses parenthesised option syntax: * CREATE_REPLICATION_SLOT "name" LOGICAL plugin (FAILOVER) * - * We key off the regular SQL connection (sql_conn) for version detection. - * Replication protocol connections (repl_conn) return 0 from PQserverVersion() - * so they cannot be used for this check. + * Detect the version on the SQL connection (sql_conn), which we hold open + * anyway for the slot-reclaim queries below. */ if (PQserverVersion(sql_conn) >= 170000) appendStringInfo(&query, " (FAILOVER)"); @@ -616,8 +646,7 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, * consistent with the slot's WAL position — the correct snapshot for COPY. * * Mark the slot with (FAILOVER) when the remote provider is PG17+. - * Use the regular SQL connection (conn) for version detection — replication - * protocol connections (repl_conn) return 0 from PQserverVersion(). + * Detect the version on the SQL connection (conn), held open anyway. */ appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); @@ -1450,6 +1479,38 @@ spock_sync_subscription(SpockSubscription *sub) origin_conn = spock_connect(sub->origin_if->dsn, sub->name, "snap"); + + /* + * An incompatible upstream is permanent, so disable the subscription + * rather than let the worker crash-loop on every restart. Checked + * before the replication connection and slot are created. No + * transaction is open (the switch above committed its own), so + * spock_disable_subscription() commits its own and the disable survives + * the FATAL; origin_conn is left open to keep remoteversion_str valid. + */ + if (SyncKindStructure(sync->kind) && + !upstream_version_supports_structure_sync(origin_conn)) + { + const char *remoteversion_str = PQparameterStatus(origin_conn, + "server_version"); + + Assert(!IsTransactionState()); + + exception_behaviour = SUB_DISABLE; + spock_disable_subscription(sub, InvalidRepOriginId, + InvalidTransactionId, + InvalidXLogRecPtr, 0); + + ereport(FATAL, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("SPOCK %s: cannot synchronize structure from upstream server version %s", + sub->name, + remoteversion_str ? remoteversion_str : "unknown"), + errdetail("The subscriber's pg_dump/pg_restore (version %s) cannot dump from a newer major version.", + PG_VERSION), + errhint("Synchronize the schema another way and recreate the subscription with synchronize_structure = false, then re-enable the subscription."))); + } + origin_conn_repl = spock_connect_replica(sub->origin_if->dsn, sub->name, "snap");