Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ jobs:
- name: Run Merkle Tree composite pkey tests
run: go test -count=1 -v ./tests/integration -run 'TestMerkleTreeCompositePK'

- name: Run mtree regression tests
run: go test -count=1 -v ./tests/integration -run 'TestBuildMtree|TestUpdateMtree|TestMtreeInit|TestACEConn'

- name: Run CDC regression tests
run: go test -count=1 -v ./tests/integration -run 'CDC'

Expand Down
80 changes: 80 additions & 0 deletions tests/integration/cdc_busy_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pgedge/ace/db/queries"
"github.com/pgedge/ace/internal/infra/cdc"
Expand Down Expand Up @@ -316,6 +317,15 @@ func setupCDCTestTable(t *testing.T, ctx context.Context, tableName string) {
require.NoError(t, err)
_, err = pool.Exec(ctx, fmt.Sprintf("INSERT INTO %s.%s SELECT * FROM %s.customers", testSchema, tableName, testSchema))
require.NoError(t, err)
// Without ANALYZE, GetRowCountEstimate falls back to pg_class.reltuples
// (~0 for a just-INSERTed table), and BuildMtree calculates
// numBlocks = ceil(estimate / blockSize) = 1, producing a degenerate
// 1-leaf tree where root and leaf hashes are identical. ANALYZE
// populates n_live_tup so BuildMtree sees the real row count and
// builds a realistic multi-leaf tree. pgx.Identifier.Sanitize
// avoids raw-string interpolation in SQL.
_, err = pool.Exec(ctx, "ANALYZE "+pgx.Identifier{testSchema, tableName}.Sanitize()) // nosemgrep
require.NoError(t, err)
}
}

Expand All @@ -328,3 +338,73 @@ func dropCDCTestTable(t *testing.T, tableName string) {
}
}
}

// TestGetCDCMetadataLegacySchema simulates a cluster that ran an older ACE
// version: ace_cdc_metadata exists but lacks the pub_commit_lsn column. The
// query must still parse (no SQLSTATE 42703) and return an empty
// pub_commit_lsn via the to_jsonb(m) ->> 'pub_commit_lsn' fallback, and
// cdc.UpdateFromCDC must warn-and-skip rather than fail. Regression guard
// for b5a7bf7.
func TestGetCDCMetadataLegacySchema(t *testing.T) {
ctx := context.Background()
tableName := "customers_cdc_legacy"
qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName)

setupCDCTestTable(t, ctx, tableName)

mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1})
require.NoError(t, mtreeTask.RunChecks(false))
require.NoError(t, mtreeTask.MtreeInit())
require.NoError(t, mtreeTask.BuildMtree())

t.Cleanup(func() {
if err := mtreeTask.MtreeTeardown(); err != nil {
t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err)
}
dropCDCTestTable(t, tableName)
})

_, _, _, pubCommitBefore, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool)
require.NoError(t, err)
require.NotEmpty(t, pubCommitBefore, "fresh mtree init should populate pub_commit_lsn")

// Real upgrades reach this state when an operator upgrades the binary
// against a database initialised by an older ACE that never had the
// column. pgx.Identifier.Sanitize avoids interpolating a config string
// into SQL.
metadataTbl := pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize()
_, err = pgCluster.Node1Pool.Exec(ctx, "ALTER TABLE "+metadataTbl+" DROP COLUMN pub_commit_lsn") // nosemgrep
require.NoError(t, err)

slot, startLSN, tables, pubCommitAfter, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool)
require.NoError(t, err, "GetCDCMetadata must tolerate missing pub_commit_lsn column")
require.Equal(t, "", pubCommitAfter, "pub_commit_lsn must be empty on legacy schema")
require.NotEmpty(t, slot, "slot_name should still be returned")
require.NotEmpty(t, startLSN, "start_lsn should still be returned")
require.NotEmpty(t, tables, "tables should still be returned")

startBefore := metadataStartLSN(t, ctx)

safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize()
_, err = pgCluster.Node1Pool.Exec(ctx, "UPDATE "+safeTbl+" SET email = email || '.legacy' WHERE index = 1") // nosemgrep
require.NoError(t, err)

targetFlush := walFlushLSN(t, ctx, pgCluster.Node1Pool)

nodeInfo := pgCluster.ClusterNodes[0]
require.NoError(t, cdc.UpdateFromCDC(context.Background(), nodeInfo),
"UpdateFromCDC must warn-and-skip past the missing pub_commit_lsn column")

startAfter := metadataStartLSN(t, ctx)
require.True(t, startAfter > startBefore, "start_lsn should advance even on legacy schema")
require.True(t, startAfter >= targetFlush, "start_lsn should catch up to wal_flush_lsn")
}

func getCDCMetadataInTx(ctx context.Context, pool *pgxpool.Pool) (string, string, []string, string, error) {
tx, err := pool.Begin(ctx)
if err != nil {
return "", "", nil, "", err
}
defer tx.Rollback(ctx)
return queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName)
}
12 changes: 12 additions & 0 deletions tests/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/docker/go-connections/nat"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/pgedge/ace/pkg/config"
"github.com/pgedge/ace/pkg/types"
Expand Down Expand Up @@ -466,6 +467,17 @@ func setupSharedCustomersTable(tableName string) error {
err,
)
}
// ANALYZE so pg_class.reltuples / pg_stat_user_tables.n_live_tup
// reflect the real row count before any test reads
// GetRowCountEstimate. Without it, BuildMtree sees ~0 rows on a
// just-loaded table and builds a degenerate 1-leaf tree.
// pgx.Identifier.Sanitize quotes and escapes the schema/table so
// names like customers_1M survive and no SQL is interpolated from
// raw strings.
tbl := pgx.Identifier{testSchema, tableName}.Sanitize()
if _, err := pool.Exec(ctx, "ANALYZE "+tbl); err != nil { // nosemgrep
return fmt.Errorf("failed to ANALYZE %s on node %s: %w", qualifiedTableName, nodeName, err)
}
}
return nil
}
Expand Down
Loading
Loading