Skip to content

Commit 52d3e7f

Browse files
authored
fix: Add more verbose error for when partition and sorting key changes (#21265)
This improves the error message when migrating between plugin versions where sorting key or partition keys have changed. Before, the user was left guessing what changes were needed for certain tables. ### Before ```bash cloudquery sync aws_to_clickhouse.yaml Loading spec(s) from aws_to_clickhouse.yaml Starting sync for: aws (cloudquery/aws@v32.37.1) -> [clickhouse (cloudquery/clickhouse@v7.3.1)] Error: failed to sync v3 source aws: write client returned error (insert): plugin returned error: Can't migrate tables automatically, migrate manually or consider using 'migrate_mode: forced'. Non auto migratable tables changes: aws_ec2_instance_types: - Column "placement_group_info" added with type "utf8" - Column "test_column" with type "utf8" removed - Not null constraint removed from column "v_cpu_info" aws_eks_cluster_node_groups: aws_elasticache_replication_groups: aws_elbv1_load_balancers: aws_elbv2_load_balancers: aws_rds_clusters: aws_rds_instances: ``` ### After: ```bash cloudquery sync aws_to_clickhouse.yaml Loading spec(s) from aws_to_clickhouse.yaml Starting sync for: aws (cloudquery/aws@v32.37.1) -> [clickhouse (local@/Users/herman/code/cloudquery/cloudquery/plugins/destination/clickhouse/clickhouse)] Error: failed to sync v3 source aws: write client returned error (insert): plugin returned error: Can't migrate tables automatically, migrate manually or consider using 'migrate_mode: forced'. Non auto-migratable tables: aws_ec2_instance_types: - Column "placement_group_info" added with type "utf8" - Column "test_column" with type "utf8" removed - Not null constraint removed from column "v_cpu_info" - Sorting key changed (was [supported_boot_modes,supported_root_device_types,supported_usage_classes,supported_virtualization_types,_cq_id] and would become [_cq_id]) aws_eks_cluster_node_groups: - Sorting key changed (was [instance_types,subnets,_cq_id] and would become [_cq_id]) aws_elasticache_replication_groups: - Sorting key changed (was [member_clusters,member_clusters_outpost_arns,user_group_ids,_cq_id] and would become [_cq_id]) aws_elbv1_load_balancers: - Sorting key changed (was [availability_zones,security_groups,subnets,_cq_id] and would become [_cq_id]) aws_elbv2_load_balancers: - Sorting key changed (was [security_groups,_cq_id] and would become [_cq_id]) aws_rds_clusters: - Sorting key changed (was [availability_zones,custom_endpoints,enabled_cloudwatch_logs_exports,read_replica_identifiers,_cq_id] and would become [_cq_id]) aws_rds_instances: - Sorting key changed (was [enabled_cloudwatch_logs_exports,read_replica_db_cluster_identifiers,read_replica_db_instance_identifiers,_cq_id] and would become [_cq_id]) ```
1 parent c993d03 commit 52d3e7f

1 file changed

Lines changed: 78 additions & 17 deletions

File tree

plugins/destination/clickhouse/client/migrate.go

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,22 @@ type tableChanges struct {
2020
forcedMigrationNeeded bool
2121
oldTTL string
2222
newTTL string
23+
tableSchemaChanges []tableSchemaChange
2324
changes []schema.TableColumnChange
2425
}
2526

27+
type tableSchemaChangeKind string
28+
29+
const (
30+
partitionKeyChange tableSchemaChangeKind = "partition_key_change"
31+
sortKeyChange tableSchemaChangeKind = "sort_key_change"
32+
)
33+
34+
type tableSchemaChange struct {
35+
kind tableSchemaChangeKind
36+
change string
37+
}
38+
2639
// MigrateTables relies on the CLI/client to lock before running migration.
2740
func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrateTables) error {
2841
have, err := retryGetTableDefinitions(ctx, c.logger, c.database, c.conn, messages)
@@ -51,13 +64,13 @@ func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrat
5164
return allTablesChanges[table].forcedMigrationNeeded && !tablesWeCanForceMigrate[table]
5265
})
5366
if len(nonAutoMigratableTables) > 0 {
54-
changes := lo.FromEntries(lo.Map(nonAutoMigratableTables, func(table string, _ int) lo.Entry[string, []schema.TableColumnChange] {
55-
return lo.Entry[string, []schema.TableColumnChange]{
67+
changes := lo.FromEntries(lo.Map(nonAutoMigratableTables, func(table string, _ int) lo.Entry[string, tableChanges] {
68+
return lo.Entry[string, tableChanges]{
5669
Key: table,
57-
Value: allTablesChanges[table].changes,
70+
Value: allTablesChanges[table],
5871
}
5972
}))
60-
return fmt.Errorf("\nCan't migrate tables automatically, migrate manually or consider using 'migrate_mode: forced'. Non auto migratable tables changes:\n\n%s", schema.GetChangesSummary(changes))
73+
return fmt.Errorf("\n\nCan't migrate tables automatically, migrate manually or consider using 'migrate_mode: forced'.\n\nNon auto-migratable tables:\n\n%s", summarizeTableChanges(changes))
6174
}
6275

6376
const maxConcurrentMigrate = 10
@@ -97,25 +110,76 @@ func (c *Client) MigrateTables(ctx context.Context, messages message.WriteMigrat
97110
return eg.Wait()
98111
}
99112

113+
func summarizeTableChanges(tablesChanges map[string]tableChanges) string {
114+
tables := lo.Keys(tablesChanges)
115+
slices.Sort(tables)
116+
summary := strings.Builder{}
117+
for i, table := range tables {
118+
summary.WriteString(fmt.Sprintf("%s:", table))
119+
changes := tablesChanges[table]
120+
changesString := lo.Map(changes.changes, func(change schema.TableColumnChange, _ int) string {
121+
return fmt.Sprintf(" - %s", schema.GetColumnChangeSummary(change))
122+
})
123+
slices.Sort(changesString)
124+
if len(changesString) > 0 {
125+
summary.WriteString("\n" + strings.Join(changesString, "\n"))
126+
}
127+
128+
// write sorting and partition key changes
129+
if len(changes.tableSchemaChanges) > 0 {
130+
summary.WriteString("\n")
131+
schemaChanges := lo.Map(changes.tableSchemaChanges, func(change tableSchemaChange, _ int) string {
132+
return fmt.Sprintf(" - %s", strings.ToUpper(change.change[0:1])+change.change[1:])
133+
})
134+
slices.Sort(schemaChanges)
135+
summary.WriteString(strings.Join(schemaChanges, "\n"))
136+
}
137+
138+
if i < len(tables)-1 {
139+
summary.WriteString("\n\n")
140+
}
141+
}
142+
143+
return summary.String()
144+
}
145+
100146
func (c *Client) allTablesChanges(ctx context.Context, want schema.Tables, have schema.Tables) (map[string]tableChanges, error) {
101147
result := make(map[string]tableChanges)
102148
for _, t := range want {
103149
chTable := have.Get(t.Name)
104150
if chTable == nil {
105151
result[t.Name] = tableChanges{
106152
alreadyExists: false,
107-
changes: nil,
153+
forcedMigrationNeeded: false,
108154
oldTTL: "",
109155
newTTL: "",
110-
forcedMigrationNeeded: false,
156+
tableSchemaChanges: nil,
157+
changes: nil,
111158
}
112159
continue
113160
}
114161
changes := t.GetChanges(chTable)
115-
forcedMigrationNeeded, err := c.forceMigrationNeeded(ctx, t, changes)
162+
163+
partitionChange, sortingChange, err := c.checkPartitionOrOrderByChanged(ctx, t, c.spec.Partition, c.spec.OrderBy)
116164
if err != nil {
117-
return nil, err
165+
return nil, fmt.Errorf("failed to check partition or order by changes for table %s: %w", t.Name, err)
166+
}
167+
tableSchemaChanges := make([]tableSchemaChange, 0, 2)
168+
if partitionChange != "" {
169+
tableSchemaChanges = append(tableSchemaChanges, tableSchemaChange{
170+
kind: partitionKeyChange,
171+
change: partitionChange,
172+
})
173+
}
174+
if sortingChange != "" {
175+
tableSchemaChanges = append(tableSchemaChanges, tableSchemaChange{
176+
kind: sortKeyChange,
177+
change: sortingChange,
178+
})
118179
}
180+
tableSchemaChanges = slices.Clip(tableSchemaChanges)
181+
182+
forcedMigrationNeeded := c.forceMigrationNeeded(changes, tableSchemaChanges)
119183
oldTTL, newTTL, err := c.checkTTLChanged(ctx, t)
120184
if err != nil {
121185
return nil, fmt.Errorf("failed to check TTL changes for table %s: %w", t.Name, err)
@@ -125,26 +189,23 @@ func (c *Client) allTablesChanges(ctx context.Context, want schema.Tables, have
125189
changes: changes,
126190
oldTTL: oldTTL,
127191
newTTL: newTTL,
192+
tableSchemaChanges: tableSchemaChanges,
128193
forcedMigrationNeeded: forcedMigrationNeeded,
129194
}
130195
}
131196
return result, nil
132197
}
133198

134-
func (c *Client) forceMigrationNeeded(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) (bool, error) {
199+
func (*Client) forceMigrationNeeded(changes []schema.TableColumnChange, tableChanges []tableSchemaChange) bool {
135200
if unsafe := unsafeChanges(changes); len(unsafe) > 0 {
136-
return true, nil
201+
return true
137202
}
138203

139-
partitionKeyChange, sortingKeyChange, err := c.checkPartitionOrOrderByChanged(ctx, table, c.spec.Partition, c.spec.OrderBy)
140-
if err != nil {
141-
return false, fmt.Errorf("failed to check partition or order by changed: %w", err)
142-
}
143-
if partitionKeyChange != "" || sortingKeyChange != "" {
144-
return true, nil
204+
if len(tableChanges) > 0 {
205+
return true
145206
}
146207

147-
return false, nil
208+
return false
148209
}
149210

150211
func unsafeChanges(changes []schema.TableColumnChange) []schema.TableColumnChange {

0 commit comments

Comments
 (0)