Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/hammer/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func Diff(ddl1, ddl2 DDL) (DDL, error) {
from: database1,
to: database2,
willCreateOrAlterChangeStreamIDs: map[string]*ChangeStream{},
alteredChangeStreamStates: map[string]*ChangeStream{},
}
return generator.GenerateDDL(), nil
}
Expand Down Expand Up @@ -257,6 +258,7 @@ type Generator struct {
droppedConstraints []*ast.TableConstraint
droppedGrant []*Grant
willCreateOrAlterChangeStreamIDs map[string]*ChangeStream
alteredChangeStreamStates map[string]*ChangeStream
}

func (g *Generator) GenerateDDL() DDL {
Expand Down Expand Up @@ -324,6 +326,9 @@ func (g *Generator) GenerateDDL() DDL {
ddl.Append(cs)
continue
}
if alteredState, hasAlteredState := g.alteredChangeStreamStates[identsToComparable(cs.Name)]; hasAlteredState {
fromChangeStream = alteredState
}

ddl.AppendDDL(g.generateDDLForAlterChangeStream(fromChangeStream, cs))
}
Expand Down Expand Up @@ -474,6 +479,38 @@ func (g *Generator) generateDDLForDropConstraintIndexAndTable(table *Table) DDL
}
for _, cs := range table.changeStreams {
if !g.isDropedChangeStream(identsToComparable(cs.Name)) {
if csFor, ok := cs.For.(*ast.ChangeStreamForTables); ok && len(csFor.Tables) > 1 {
var remainingTables []*ast.ChangeStreamForTable
for _, t := range csFor.Tables {
if identsToComparable(t.TableName) != identsToComparable(table.Name.Idents...) {
remainingTables = append(remainingTables, t)
}
}
hasRemainingTableInTarget := false
for _, t := range remainingTables {
if _, exists := g.findTableByName(g.to.tables, identsToComparable(t.TableName)); exists {
hasRemainingTableInTarget = true
break
}
}
if len(remainingTables) > 0 && hasRemainingTableInTarget {
ddl.Append(&ast.AlterChangeStream{
Name: cs.Name,
ChangeStreamAlteration: &ast.ChangeStreamSetFor{
For: &ast.ChangeStreamForTables{Tables: remainingTables},
},
})
alteredCS := &ChangeStream{
CreateChangeStream: &ast.CreateChangeStream{
Name: cs.Name,
For: &ast.ChangeStreamForTables{Tables: remainingTables},
Options: cs.Options,
},
}
g.alteredChangeStreamStates[identsToComparable(cs.Name)] = alteredCS
continue
}
}
ddl.Append(&ast.DropChangeStream{Name: cs.Name})
g.dropedChangeStream = append(g.dropedChangeStream, identsToComparable(cs.Name))
}
Expand Down
38 changes: 37 additions & 1 deletion internal/hammer/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2211,7 +2211,43 @@ CREATE TABLE T1 (
expected: []string{
`DROP TABLE T2`,
`REVOKE SELECT ON TABLE T1 FROM ROLE role1`,
`DROP ROLE role1`,
`DROP ROLE role1`,
},
},
{
name: "recreating table and change stream watches multiple tables",
from: `
CREATE TABLE T1 (id INT64, name STRING(100)) PRIMARY KEY(id, name);
CREATE TABLE T2 (id INT64 PRIMARY KEY);
CREATE CHANGE STREAM CS1 FOR T1, T2;
`,
to: `
CREATE TABLE T1 (id INT64, name STRING(100) NOT NULL) PRIMARY KEY(id, name);
CREATE TABLE T2 (id INT64 PRIMARY KEY);
CREATE CHANGE STREAM CS1 FOR T1, T2;
`,
expected: []string{
`ALTER CHANGE STREAM CS1 SET FOR T2`,
`DROP TABLE T1`,
"CREATE TABLE T1 (\n id INT64,\n name STRING(100) NOT NULL\n) PRIMARY KEY (id, name)",
`ALTER CHANGE STREAM CS1 SET FOR T1, T2`,
},
},
{
name: "recreating table and only change stream is for that table",
from: `
CREATE TABLE T1 (id INT64, name STRING(100)) PRIMARY KEY(id, name);
CREATE CHANGE STREAM CS1 FOR T1;
`,
to: `
CREATE TABLE T1 (id INT64, name STRING(100) NOT NULL) PRIMARY KEY(id, name);
CREATE CHANGE STREAM CS1 FOR T1;
`,
expected: []string{
`DROP CHANGE STREAM CS1`,
`DROP TABLE T1`,
"CREATE TABLE T1 (\n id INT64,\n name STRING(100) NOT NULL\n) PRIMARY KEY (id, name)",
`CREATE CHANGE STREAM CS1 FOR T1`,
},
},
}
Expand Down