Skip to content

Commit 19bba77

Browse files
authored
feat(sqlite): Support force migration (#6763)
#### Summary To be merged after #6759, and needs cloudquery/plugin-sdk#604 <!--
1 parent 215ddb3 commit 19bba77

2 files changed

Lines changed: 171 additions & 91 deletions

File tree

plugins/destination/sqlite/client/client_test.go

Lines changed: 92 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -52,81 +52,101 @@ func TestPluginMigrateMultiplePKs(t *testing.T) {
5252
}
5353

5454
func TestMigrateErrors(t *testing.T) {
55-
table := schema.Table{
56-
Name: "table_1",
57-
Columns: []schema.Column{
58-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
59-
{Name: "name", Type: schema.TypeString},
60-
{Name: "age", Type: schema.TypeInt},
61-
{Name: "created_at", Type: schema.TypeString},
62-
},
63-
}
64-
p := destination.NewPlugin("sqlite", "development", New)
65-
ctx := context.Background()
66-
67-
spec := Spec{
68-
ConnectionString: ":memory:",
69-
}
70-
71-
// Init the plugin so we can call migrate
72-
if err := p.Init(ctx, zerolog.Logger{}, specs.Destination{Name: "cq_test_migrate", Spec: spec}); err != nil {
73-
t.Fatal(err)
74-
}
75-
76-
if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil {
77-
t.Fatal(err)
78-
}
79-
80-
tableWithMigratableChange := schema.Table{
81-
Name: "table_1",
82-
Columns: []schema.Column{
83-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
84-
{Name: "name", Type: schema.TypeString},
85-
{Name: "age", Type: schema.TypeInt},
86-
{Name: "created_at", Type: schema.TypeString},
87-
{Name: "new_column", Type: schema.TypeString},
55+
tests := []struct {
56+
name string
57+
spec specs.Destination
58+
wantErr string
59+
}{
60+
{
61+
name: "should fail on migrate mode safe",
62+
spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}},
63+
wantErr: `failed to migrate schema:
64+
can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table
65+
can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table
66+
can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table
67+
68+
To force a migration add "migrate_mode: forced" to your destination spec`,
8869
},
89-
}
90-
newTable := schema.Table{
91-
Name: "table_2",
92-
Columns: []schema.Column{
93-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
94-
{Name: "name", Type: schema.TypeString},
95-
{Name: "age", Type: schema.TypeInt},
96-
{Name: "created_at", Type: schema.TypeString},
97-
},
98-
}
99-
100-
if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil {
101-
t.Fatal(err)
102-
}
103-
104-
tableWithNonTableDropNeeded := schema.Table{
105-
Name: "table_1",
106-
Columns: []schema.Column{
107-
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
108-
{Name: "name", Type: schema.TypeString},
109-
{Name: "age", Type: schema.TypeInt},
110-
{Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
111-
{Name: "created_at", Type: schema.TypeTimestamp},
70+
{
71+
name: "should succeed on migrate mode force",
72+
spec: specs.Destination{Name: "cq_test_migrate", Spec: Spec{ConnectionString: ":memory:"}, MigrateMode: specs.MigrateModeForced},
11273
},
11374
}
11475

115-
tableWithColumnsDropNeeded := schema.Table{
116-
Name: "table_2",
117-
Columns: []schema.Column{
118-
{Name: "id", Type: schema.TypeUUID},
119-
{Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
120-
{Name: "age", Type: schema.TypeString},
121-
{Name: "created_at", Type: schema.TypeTimestamp},
122-
},
76+
for _, tt := range tests {
77+
t.Run(tt.name, func(t *testing.T) {
78+
table := schema.Table{
79+
Name: "table_1",
80+
Columns: []schema.Column{
81+
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
82+
{Name: "name", Type: schema.TypeString},
83+
{Name: "age", Type: schema.TypeInt},
84+
{Name: "created_at", Type: schema.TypeString},
85+
},
86+
}
87+
p := destination.NewPlugin("sqlite", "development", New)
88+
ctx := context.Background()
89+
90+
// Init the plugin so we can call migrate
91+
if err := p.Init(ctx, zerolog.Logger{}, tt.spec); err != nil {
92+
t.Fatal(err)
93+
}
94+
95+
if err := p.Migrate(ctx, []*schema.Table{&table}); err != nil {
96+
t.Fatal(err)
97+
}
98+
99+
tableWithMigratableChange := schema.Table{
100+
Name: "table_1",
101+
Columns: []schema.Column{
102+
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
103+
{Name: "name", Type: schema.TypeString},
104+
{Name: "age", Type: schema.TypeInt},
105+
{Name: "created_at", Type: schema.TypeString},
106+
{Name: "new_column", Type: schema.TypeString},
107+
},
108+
}
109+
newTable := schema.Table{
110+
Name: "table_2",
111+
Columns: []schema.Column{
112+
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
113+
{Name: "name", Type: schema.TypeString},
114+
{Name: "age", Type: schema.TypeInt},
115+
{Name: "created_at", Type: schema.TypeString},
116+
},
117+
}
118+
119+
if err := p.Migrate(ctx, []*schema.Table{&tableWithMigratableChange, &newTable}); err != nil {
120+
t.Fatal(err)
121+
}
122+
123+
tableWithNonTableDropNeeded := schema.Table{
124+
Name: "table_1",
125+
Columns: []schema.Column{
126+
{Name: "id", Type: schema.TypeUUID, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
127+
{Name: "name", Type: schema.TypeString},
128+
{Name: "age", Type: schema.TypeInt},
129+
{Name: "new_pk_column", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
130+
{Name: "created_at", Type: schema.TypeTimestamp},
131+
},
132+
}
133+
134+
tableWithColumnsDropNeeded := schema.Table{
135+
Name: "table_2",
136+
Columns: []schema.Column{
137+
{Name: "id", Type: schema.TypeUUID},
138+
{Name: "name", Type: schema.TypeString, CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true}},
139+
{Name: "age", Type: schema.TypeString},
140+
{Name: "created_at", Type: schema.TypeTimestamp},
141+
},
142+
}
143+
144+
err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded})
145+
if tt.wantErr != "" {
146+
require.ErrorContains(t, err, tt.wantErr)
147+
} else {
148+
require.NoError(t, err)
149+
}
150+
})
123151
}
124-
125-
err := p.Migrate(ctx, []*schema.Table{&tableWithNonTableDropNeeded, &tableWithColumnsDropNeeded})
126-
expectedError := `failed to migrate schema:
127-
can't migrate table "table_1" since adding the new PK column "new_pk_column" is not supported. Try dropping this table
128-
can't migrate table "table_2" since changing the type of column "age" from "integer" to "text" is not supported. Try dropping this column for this table
129-
can't migrate table "table_2" since changing the type of column "created_at" from "text" to "timestamp" is not supported. Try dropping this column for this table
130-
`
131-
require.Errorf(t, err, expectedError)
132152
}

plugins/destination/sqlite/client/migrate.go

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,29 @@ type tableChange struct {
5252
columnChanges []*columnChange
5353
}
5454

55+
type migrationMessage struct {
56+
err string
57+
info string
58+
}
59+
60+
type migrationsMessages []migrationMessage
61+
62+
func (m migrationsMessages) Errors() []string {
63+
errs := make([]string, 0, len(m))
64+
for _, msg := range m {
65+
errs = append(errs, msg.err)
66+
}
67+
return errs
68+
}
69+
70+
func (m migrationsMessages) Infos() []string {
71+
infos := make([]string, 0, len(m))
72+
for _, msg := range m {
73+
infos = append(infos, msg.info)
74+
}
75+
return infos
76+
}
77+
5578
func (c *Client) getColumnChange(col schema.Column, sqliteColumn *columnInfo) *columnChange {
5679
columnName := col.Name
5780
columnType := c.SchemaTypeToSqlite(col.Type)
@@ -111,42 +134,46 @@ func (c *Client) getTableChange(ctx context.Context, table *schema.Table) (*tabl
111134
}
112135

113136
func (c *Client) getSchemaChanges(ctx context.Context, tables schema.Tables) ([]*tableChange, error) {
114-
changes := make([]*tableChange, len(tables))
115-
for i, table := range tables {
137+
changes := make([]*tableChange, 0, len(tables))
138+
for _, table := range tables {
116139
tableChange, err := c.getTableChange(ctx, table)
117140
if err != nil {
118141
return nil, err
119142
}
120-
changes[i] = tableChange
121-
for _, relation := range table.Relations {
122-
relationChanges, err := c.getTableChange(ctx, relation)
123-
if err != nil {
124-
return nil, err
125-
}
126-
changes = append(changes, relationChanges)
143+
changes = append(changes, tableChange)
144+
relationChanges, err := c.getSchemaChanges(ctx, table.Relations)
145+
if err != nil {
146+
return nil, err
127147
}
148+
changes = append(changes, relationChanges...)
128149
}
129150
return changes, nil
130151
}
131152

132-
func getMigrationErrors(changes []*tableChange) []string {
133-
var errors []string
153+
func getMigrationMessages(changes []*tableChange) migrationsMessages {
154+
var messages migrationsMessages
134155
for _, tableChange := range changes {
135156
if tableChange.new {
136157
continue
137158
}
138159
for _, colChange := range tableChange.columnChanges {
139160
if colChange.new && colChange.newPk {
140-
errors = append(errors, fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name))
161+
messages = append(messages, migrationMessage{
162+
err: fmt.Sprintf("can't migrate table %q since adding the new PK column %q is not supported. Try dropping this table", tableChange.table.Name, colChange.name),
163+
info: fmt.Sprintf("table %q will be dropped and recreated", tableChange.table.Name),
164+
})
141165
// no need to report other errors as the user needs to drop the table altogether
142166
break
143167
}
144168
if !colChange.new && colChange.oldType != colChange.newType {
145-
errors = append(errors, fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType))
169+
messages = append(messages, migrationMessage{
170+
err: fmt.Sprintf("can't migrate table %q since changing the type of column %q from %q to %q is not supported. Try dropping this column for this table", tableChange.table.Name, colChange.name, colChange.oldType, colChange.newType),
171+
info: fmt.Sprintf("column %q of table %q will be dropped and recreated", colChange.name, tableChange.table.Name),
172+
})
146173
}
147174
}
148175
}
149-
return errors
176+
return messages
150177
}
151178

152179
// This is the responsibility of the CLI of the client to lock before running migration
@@ -156,9 +183,14 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
156183
return err
157184
}
158185

159-
migrationErrors := getMigrationErrors(schemaChanges)
160-
if len(migrationErrors) > 0 {
161-
return fmt.Errorf("failed to migrate schema:\n%s", strings.Join(migrationErrors, "\n"))
186+
migrationMessages := getMigrationMessages(schemaChanges)
187+
if len(migrationMessages) > 0 {
188+
if c.spec.MigrateMode == specs.MigrateModeSafe {
189+
return fmt.Errorf("failed to migrate schema:\n%s\n\nTo force a migration add \"migrate_mode: %s\" to your destination spec", strings.Join(migrationMessages.Errors(), "\n"), specs.MigrateModeForced.String())
190+
}
191+
for _, msg := range migrationMessages.Infos() {
192+
c.logger.Info().Msg(msg)
193+
}
162194
}
163195

164196
for _, tableChange := range schemaChanges {
@@ -173,11 +205,39 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
173205
} else {
174206
c.logger.Debug().Str("table", table.Name).Msg("Table exists, auto-migrating")
175207
for _, colChange := range tableChange.columnChanges {
208+
tableName := tableChange.table.Name
209+
columnName := colChange.name
210+
columnType := colChange.newType
211+
// If this is a new PK column we need to drop the table
212+
if colChange.new && colChange.newPk {
213+
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("New column is a primary key, dropping and adding table since in forced migrate mode")
214+
sql := "drop table if exists \"" + tableName + "\""
215+
if _, err := c.db.Exec(sql); err != nil {
216+
return fmt.Errorf("failed to drop table %s: %w", tableName, err)
217+
}
218+
err := c.createTableIfNotExist(ctx, tableChange.table)
219+
if err != nil {
220+
return err
221+
}
222+
break
223+
}
176224
if colChange.new {
177225
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Column doesn't exist, creating")
178-
sql := "alter table \"" + table.Name + "\" add column \"" + colChange.name + "\" \"" + colChange.newType + `"`
226+
sql := "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"`
227+
if _, err := c.db.Exec(sql); err != nil {
228+
return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err)
229+
}
230+
}
231+
// if this is an existing column with type change we need to drop and add it
232+
if !colChange.new && colChange.oldType != colChange.newType {
233+
c.logger.Debug().Str("table", table.Name).Str("column", colChange.name).Msg("Existing column type changed, dropping and adding column since in forced migrate mode")
234+
sql := "alter table " + tableName + " drop column " + columnName
235+
if _, err := c.db.Exec(sql); err != nil {
236+
return fmt.Errorf("failed to drop column %s on table %s: %w", columnName, tableName, err)
237+
}
238+
sql = "alter table \"" + tableName + "\" add column \"" + columnName + "\" \"" + columnType + `"`
179239
if _, err := c.db.Exec(sql); err != nil {
180-
return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, table.Name, err)
240+
return fmt.Errorf("failed to add column %s on table %s: %w", colChange.name, tableName, err)
181241
}
182242
}
183243
}

0 commit comments

Comments
 (0)