@@ -7,148 +7,141 @@ import (
77
88 "github.com/apache/arrow/go/v13/arrow"
99 "github.com/cloudquery/plugin-pb-go/specs"
10- "github.com/cloudquery/plugin-sdk/v2 /schema"
10+ "github.com/cloudquery/plugin-sdk/v3 /schema"
1111)
1212
13- func (c * Client ) normalizeSchemas (tables schema.Schemas ) (schema.Schemas , error ) {
14- var normalized schema.Schemas
15- for _ , sc := range tables {
16- tableName := schema .TableName (sc )
17- fields := make ([]arrow.Field , 0 )
18- for _ , f := range sc .Fields () {
19- keys := make ([]string , 0 )
20- values := make ([]string , 0 )
21- origKeys := f .Metadata .Keys ()
22- origValues := f .Metadata .Values ()
23- for k , v := range origKeys {
24- switch v {
25- case schema .MetadataUnique :
26- // we skip as we don't scan the constraints ATM
27- continue
28- case schema .MetadataPrimaryKey :
29- if ! c .pkEnabled () {
30- continue
31- }
32- }
33- keys = append (keys , v )
34- values = append (values , origValues [k ])
35- }
36- normalizedType , err := mySQLTypeToArrowType (tableName , f .Name , arrowTypeToMySqlStr (f .Type ))
37- if err != nil {
38- return nil , err
39- }
40- fields = append (fields , arrow.Field {
41- Name : f .Name ,
42- Type : normalizedType ,
43- Nullable : f .Nullable && ! schema .IsPk (f ),
44- Metadata : arrow .NewMetadata (keys , values ),
45- })
13+ func (c * Client ) normalizeTables (tables schema.Tables ) (schema.Tables , error ) {
14+ flattened := tables .FlattenTables ()
15+ normalized := make (schema.Tables , len (flattened ))
16+ var err error
17+ for i , table := range flattened {
18+ normalized [i ], err = c .normalizeTable (table )
19+ if err != nil {
20+ return nil , err
4621 }
22+ }
23+ return normalized , nil
24+ }
4725
48- md := sc .Metadata ()
49- normalized = append (normalized , arrow .NewSchema (fields , & md ))
26+ func (c * Client ) normalizeTable (table * schema.Table ) (* schema.Table , error ) {
27+ columns := make ([]schema.Column , len (table .Columns ))
28+ for i , col := range table .Columns {
29+ if ! c .pkEnabled () {
30+ col .PrimaryKey = false
31+ }
32+ normalized , err := c .normalizeField (col .ToArrowField ())
33+ if err != nil {
34+ return nil , err
35+ }
36+ columns [i ] = schema .NewColumnFromArrowField (* normalized )
5037 }
38+ return & schema.Table {Name : table .Name , Columns : columns }, nil
39+ }
5140
52- return normalized , nil
41+ func (* Client ) normalizeField (field arrow.Field ) (* arrow.Field , error ) {
42+ normalizedType , err := mySQLTypeToArrowType ("" , "" , arrowTypeToMySqlStr (field .Type ))
43+ if err != nil {
44+ return nil , err
45+ }
46+ return & arrow.Field {
47+ Name : field .Name ,
48+ Type : normalizedType ,
49+ Nullable : field .Nullable ,
50+ Metadata : field .Metadata ,
51+ }, nil
5352}
5453
55- func (c * Client ) nonAutoMigrtableTables (tables schema.Schemas , schemaTables schema.Schemas ) (names []string , changes [][]schema.FieldChange ) {
56- var tableChanges [][]schema.FieldChange
54+ func (c * Client ) nonAutoMigratableTables (tables schema.Tables , mysqlTables schema.Tables ) ([]string , [][]schema.TableColumnChange ) {
55+ var result []string
56+ var tableChanges [][]schema.TableColumnChange
5757 for _ , t := range tables {
58- tableName := schema .TableName (t )
59- schemaTable := schemaTables .SchemaByName (tableName )
60- if schemaTable == nil {
58+ mysqlTable := mysqlTables .Get (t .Name )
59+ if mysqlTable == nil {
6160 continue
6261 }
63- changes := schema . GetSchemaChanges ( t , schemaTable )
62+ changes := mysqlTable . GetChanges ( t )
6463 if ! c .canAutoMigrate (changes ) {
65- names = append (names , tableName )
64+ result = append (result , t . Name )
6665 tableChanges = append (tableChanges , changes )
6766 }
6867 }
69- return names , tableChanges
68+ return result , tableChanges
7069}
7170
72- func (* Client ) canAutoMigrate (changes []schema.FieldChange ) bool {
71+ func (* Client ) canAutoMigrate (changes []schema.TableColumnChange ) bool {
7372 for _ , change := range changes {
74- if change .Type == schema .TableColumnChangeTypeAdd && (schema .IsPk (change .Current ) || ! change .Current .Nullable ) {
75- return false
76- }
77-
78- if change .Type == schema .TableColumnChangeTypeRemove && (schema .IsPk (change .Previous ) || ! change .Previous .Nullable ) {
79- return false
80- }
81-
82- if change .Type == schema .TableColumnChangeTypeUpdate {
73+ switch change .Type {
74+ case schema .TableColumnChangeTypeAdd :
75+ if change .Current .PrimaryKey || change .Current .NotNull {
76+ return false
77+ }
78+ case schema .TableColumnChangeTypeRemove :
79+ if change .Previous .PrimaryKey || change .Previous .NotNull {
80+ return false
81+ }
82+ case schema .TableColumnChangeTypeUpdate :
8383 return false
84+ default :
85+ panic ("unknown change type" )
8486 }
8587 }
8688 return true
8789}
88-
89- func (c * Client ) autoMigrateTable (ctx context.Context , table * arrow.Schema , changes []schema.FieldChange ) error {
90+ func (c * Client ) autoMigrateTable (ctx context.Context , table * schema.Table , changes []schema.TableColumnChange ) error {
9091 for _ , change := range changes {
9192 if change .Type == schema .TableColumnChangeTypeAdd {
92- err := c .addColumn (ctx , table , change .Current )
93- if err != nil {
93+ if err := c .addColumn (ctx , table , table .Columns .Get (change .ColumnName )); err != nil {
9494 return err
9595 }
9696 }
9797 }
98-
9998 return nil
10099}
101100
102101// Migrate relies on the CLI/client to lock before running migration.
103- func (c * Client ) Migrate (ctx context.Context , tables schema.Schemas ) error {
104- schemaTables , err := c .schemaTables (ctx , tables )
102+ func (c * Client ) Migrate (ctx context.Context , tables schema.Tables ) error {
103+ mysqlTables , err := c .schemaTables (ctx , tables )
105104 if err != nil {
106105 return err
107106 }
108107
109- normalizedTables , err := c .normalizeSchemas (tables )
108+ normalizedTables , err := c .normalizeTables (tables )
110109 if err != nil {
111110 return err
112111 }
113112
114113 if c .spec .MigrateMode != specs .MigrateModeForced {
115- nonAutoMigrtableTables , changes := c .nonAutoMigrtableTables (normalizedTables , schemaTables )
114+ nonAutoMigrtableTables , changes := c .nonAutoMigratableTables (normalizedTables , mysqlTables )
116115 if len (nonAutoMigrtableTables ) > 0 {
117116 return fmt .Errorf ("tables %s with changes %v require force migration. use 'migrate_mode: forced'" , strings .Join (nonAutoMigrtableTables , "," ), changes )
118117 }
119118 }
120119
121120 for _ , table := range normalizedTables {
122- tableName := schema .TableName (table )
123- if tableName == "" {
124- return fmt .Errorf ("schema %s has no table name" , table .String ())
125- }
126- c .logger .Info ().Str ("table" , tableName ).Msg ("Migrating table" )
127- if len (table .Fields ()) == 0 {
128- c .logger .Info ().Str ("table" , tableName ).Msg ("Table with no columns, skipping" )
121+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Migrating table" )
122+ if len (table .Columns ) == 0 {
123+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Table with no columns, skipping" )
129124 continue
130125 }
131- schemaTable := schemaTables . SchemaByName ( tableName )
132- if schemaTable == nil {
133- c .logger .Info ().Str ("table" , tableName ).Msg ("Table doesn't exist, creating" )
126+ mysql := mysqlTables . Get ( table . Name )
127+ if mysql == nil {
128+ c .logger .Debug ().Str ("table" , table . Name ).Msg ("Table doesn't exist, creating" )
134129 if err := c .createTable (ctx , table ); err != nil {
135130 return err
136131 }
137- continue
138- }
139-
140- changes := schema .GetSchemaChanges (table , schemaTable )
141- if c .canAutoMigrate (changes ) {
142- c .logger .Info ().Str ("table" , tableName ).Msg ("Table exists, auto-migrating" )
143- if err := c .autoMigrateTable (ctx , table , changes ); err != nil {
144- return err
132+ } else {
133+ changes := table .GetChanges (mysql )
134+ if c .canAutoMigrate (changes ) {
135+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Table exists, auto-migrating" )
136+ if err := c .autoMigrateTable (ctx , table , changes ); err != nil {
137+ return err
138+ }
139+ } else {
140+ c .logger .Info ().Str ("table" , table .Name ).Msg ("Table exists, force migration required" )
141+ if err := c .recreateTable (ctx , table ); err != nil {
142+ return err
143+ }
145144 }
146- continue
147- }
148-
149- c .logger .Info ().Str ("table" , tableName ).Msg ("Table exists, force migration required" )
150- if err := c .recreateTable (ctx , table ); err != nil {
151- return err
152145 }
153146 }
154147
0 commit comments