diff --git a/internal/hammer/diff.go b/internal/hammer/diff.go index ea855a4..28ca866 100644 --- a/internal/hammer/diff.go +++ b/internal/hammer/diff.go @@ -25,6 +25,7 @@ func Diff(ddl1, ddl2 DDL) (DDL, error) { from: database1, to: database2, willCreateOrAlterChangeStreamIDs: map[string]*ChangeStream{}, + alteredChangeStreamStates: map[string]*ChangeStream{}, } return generator.GenerateDDL(), nil } @@ -257,6 +258,7 @@ type Generator struct { droppedConstraints []*ast.TableConstraint droppedGrant []*Grant willCreateOrAlterChangeStreamIDs map[string]*ChangeStream + alteredChangeStreamStates map[string]*ChangeStream } func (g *Generator) GenerateDDL() DDL { @@ -324,6 +326,9 @@ func (g *Generator) GenerateDDL() DDL { ddl.Append(cs) continue } + if alteredState, hasAlteredState := g.alteredChangeStreamStates[identsToComparable(cs.Name)]; hasAlteredState { + fromChangeStream = alteredState + } ddl.AppendDDL(g.generateDDLForAlterChangeStream(fromChangeStream, cs)) } @@ -474,6 +479,38 @@ func (g *Generator) generateDDLForDropConstraintIndexAndTable(table *Table) DDL } for _, cs := range table.changeStreams { if !g.isDropedChangeStream(identsToComparable(cs.Name)) { + if csFor, ok := cs.For.(*ast.ChangeStreamForTables); ok && len(csFor.Tables) > 1 { + var remainingTables []*ast.ChangeStreamForTable + for _, t := range csFor.Tables { + if identsToComparable(t.TableName) != identsToComparable(table.Name.Idents...) { + remainingTables = append(remainingTables, t) + } + } + hasRemainingTableInTarget := false + for _, t := range remainingTables { + if _, exists := g.findTableByName(g.to.tables, identsToComparable(t.TableName)); exists { + hasRemainingTableInTarget = true + break + } + } + if len(remainingTables) > 0 && hasRemainingTableInTarget { + ddl.Append(&ast.AlterChangeStream{ + Name: cs.Name, + ChangeStreamAlteration: &ast.ChangeStreamSetFor{ + For: &ast.ChangeStreamForTables{Tables: remainingTables}, + }, + }) + alteredCS := &ChangeStream{ + CreateChangeStream: &ast.CreateChangeStream{ + Name: cs.Name, + For: &ast.ChangeStreamForTables{Tables: remainingTables}, + Options: cs.Options, + }, + } + g.alteredChangeStreamStates[identsToComparable(cs.Name)] = alteredCS + continue + } + } ddl.Append(&ast.DropChangeStream{Name: cs.Name}) g.dropedChangeStream = append(g.dropedChangeStream, identsToComparable(cs.Name)) } diff --git a/internal/hammer/diff_test.go b/internal/hammer/diff_test.go index f7a2610..2f9481f 100644 --- a/internal/hammer/diff_test.go +++ b/internal/hammer/diff_test.go @@ -2211,7 +2211,43 @@ CREATE TABLE T1 ( expected: []string{ `DROP TABLE T2`, `REVOKE SELECT ON TABLE T1 FROM ROLE role1`, - `DROP ROLE role1`, + `DROP ROLE role1`, + }, + }, + { + name: "recreating table and change stream watches multiple tables", + from: ` + CREATE TABLE T1 (id INT64, name STRING(100)) PRIMARY KEY(id, name); + CREATE TABLE T2 (id INT64 PRIMARY KEY); + CREATE CHANGE STREAM CS1 FOR T1, T2; + `, + to: ` + CREATE TABLE T1 (id INT64, name STRING(100) NOT NULL) PRIMARY KEY(id, name); + CREATE TABLE T2 (id INT64 PRIMARY KEY); + CREATE CHANGE STREAM CS1 FOR T1, T2; + `, + expected: []string{ + `ALTER CHANGE STREAM CS1 SET FOR T2`, + `DROP TABLE T1`, + "CREATE TABLE T1 (\n id INT64,\n name STRING(100) NOT NULL\n) PRIMARY KEY (id, name)", + `ALTER CHANGE STREAM CS1 SET FOR T1, T2`, + }, + }, + { + name: "recreating table and only change stream is for that table", + from: ` + CREATE TABLE T1 (id INT64, name STRING(100)) PRIMARY KEY(id, name); + CREATE CHANGE STREAM CS1 FOR T1; + `, + to: ` + CREATE TABLE T1 (id INT64, name STRING(100) NOT NULL) PRIMARY KEY(id, name); + CREATE CHANGE STREAM CS1 FOR T1; + `, + expected: []string{ + `DROP CHANGE STREAM CS1`, + `DROP TABLE T1`, + "CREATE TABLE T1 (\n id INT64,\n name STRING(100) NOT NULL\n) PRIMARY KEY (id, name)", + `CREATE CHANGE STREAM CS1 FOR T1`, }, }, }