Skip to content

Commit f28fc65

Browse files
authored
feat(mysql): Migrate to SDK V3 native arrow (#10867)
#### Summary <!-- Explain what problem this PR addresses --> <!--
1 parent 4cc0817 commit f28fc65

12 files changed

Lines changed: 249 additions & 211 deletions

File tree

plugins/destination/mysql/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"time"
88

99
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
10+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1111
"github.com/rs/zerolog"
1212

1313
mysql "github.com/go-sql-driver/mysql"

plugins/destination/mysql/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66

77
"github.com/cloudquery/cloudquery/plugins/destination/mysql/resources/plugin"
88
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
9+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1010
)
1111

1212
var migrateStrategy = destination.MigrateStrategy{

plugins/destination/mysql/client/deletestale.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
99
)
1010

11-
func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error {
11+
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
1212
for _, table := range tables {
13-
name := schema.TableName(table)
14-
query := fmt.Sprintf(`delete from %s where %s = ? and %s < ?`, identifier(name), identifier(schema.CqSourceNameColumn.Name), identifier(schema.CqSyncTimeColumn.Name))
13+
query := fmt.Sprintf(`delete from %s where %s = ? and %s < ?`, identifier(table.Name), identifier(schema.CqSourceNameColumn.Name), identifier(schema.CqSyncTimeColumn.Name))
1514
if _, err := c.db.ExecContext(ctx, query, source, syncTime); err != nil {
1615
return err
1716
}

plugins/destination/mysql/client/migrate.go

Lines changed: 81 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -7,148 +7,141 @@ import (
77

88
"github.com/apache/arrow/go/v13/arrow"
99
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v2/schema"
10+
"github.com/cloudquery/plugin-sdk/v3/schema"
1111
)
1212

13-
func (c *Client) normalizeSchemas(tables schema.Schemas) (schema.Schemas, error) {
14-
var normalized schema.Schemas
15-
for _, sc := range tables {
16-
tableName := schema.TableName(sc)
17-
fields := make([]arrow.Field, 0)
18-
for _, f := range sc.Fields() {
19-
keys := make([]string, 0)
20-
values := make([]string, 0)
21-
origKeys := f.Metadata.Keys()
22-
origValues := f.Metadata.Values()
23-
for k, v := range origKeys {
24-
switch v {
25-
case schema.MetadataUnique:
26-
// we skip as we don't scan the constraints ATM
27-
continue
28-
case schema.MetadataPrimaryKey:
29-
if !c.pkEnabled() {
30-
continue
31-
}
32-
}
33-
keys = append(keys, v)
34-
values = append(values, origValues[k])
35-
}
36-
normalizedType, err := mySQLTypeToArrowType(tableName, f.Name, arrowTypeToMySqlStr(f.Type))
37-
if err != nil {
38-
return nil, err
39-
}
40-
fields = append(fields, arrow.Field{
41-
Name: f.Name,
42-
Type: normalizedType,
43-
Nullable: f.Nullable && !schema.IsPk(f),
44-
Metadata: arrow.NewMetadata(keys, values),
45-
})
13+
func (c *Client) normalizeTables(tables schema.Tables) (schema.Tables, error) {
14+
flattened := tables.FlattenTables()
15+
normalized := make(schema.Tables, len(flattened))
16+
var err error
17+
for i, table := range flattened {
18+
normalized[i], err = c.normalizeTable(table)
19+
if err != nil {
20+
return nil, err
4621
}
22+
}
23+
return normalized, nil
24+
}
4725

48-
md := sc.Metadata()
49-
normalized = append(normalized, arrow.NewSchema(fields, &md))
26+
func (c *Client) normalizeTable(table *schema.Table) (*schema.Table, error) {
27+
columns := make([]schema.Column, len(table.Columns))
28+
for i, col := range table.Columns {
29+
if !c.pkEnabled() {
30+
col.PrimaryKey = false
31+
}
32+
normalized, err := c.normalizeField(col.ToArrowField())
33+
if err != nil {
34+
return nil, err
35+
}
36+
columns[i] = schema.NewColumnFromArrowField(*normalized)
5037
}
38+
return &schema.Table{Name: table.Name, Columns: columns}, nil
39+
}
5140

52-
return normalized, nil
41+
func (*Client) normalizeField(field arrow.Field) (*arrow.Field, error) {
42+
normalizedType, err := mySQLTypeToArrowType("", "", arrowTypeToMySqlStr(field.Type))
43+
if err != nil {
44+
return nil, err
45+
}
46+
return &arrow.Field{
47+
Name: field.Name,
48+
Type: normalizedType,
49+
Nullable: field.Nullable,
50+
Metadata: field.Metadata,
51+
}, nil
5352
}
5453

55-
func (c *Client) nonAutoMigrtableTables(tables schema.Schemas, schemaTables schema.Schemas) (names []string, changes [][]schema.FieldChange) {
56-
var tableChanges [][]schema.FieldChange
54+
func (c *Client) nonAutoMigratableTables(tables schema.Tables, mysqlTables schema.Tables) ([]string, [][]schema.TableColumnChange) {
55+
var result []string
56+
var tableChanges [][]schema.TableColumnChange
5757
for _, t := range tables {
58-
tableName := schema.TableName(t)
59-
schemaTable := schemaTables.SchemaByName(tableName)
60-
if schemaTable == nil {
58+
mysqlTable := mysqlTables.Get(t.Name)
59+
if mysqlTable == nil {
6160
continue
6261
}
63-
changes := schema.GetSchemaChanges(t, schemaTable)
62+
changes := mysqlTable.GetChanges(t)
6463
if !c.canAutoMigrate(changes) {
65-
names = append(names, tableName)
64+
result = append(result, t.Name)
6665
tableChanges = append(tableChanges, changes)
6766
}
6867
}
69-
return names, tableChanges
68+
return result, tableChanges
7069
}
7170

72-
func (*Client) canAutoMigrate(changes []schema.FieldChange) bool {
71+
func (*Client) canAutoMigrate(changes []schema.TableColumnChange) bool {
7372
for _, change := range changes {
74-
if change.Type == schema.TableColumnChangeTypeAdd && (schema.IsPk(change.Current) || !change.Current.Nullable) {
75-
return false
76-
}
77-
78-
if change.Type == schema.TableColumnChangeTypeRemove && (schema.IsPk(change.Previous) || !change.Previous.Nullable) {
79-
return false
80-
}
81-
82-
if change.Type == schema.TableColumnChangeTypeUpdate {
73+
switch change.Type {
74+
case schema.TableColumnChangeTypeAdd:
75+
if change.Current.PrimaryKey || change.Current.NotNull {
76+
return false
77+
}
78+
case schema.TableColumnChangeTypeRemove:
79+
if change.Previous.PrimaryKey || change.Previous.NotNull {
80+
return false
81+
}
82+
case schema.TableColumnChangeTypeUpdate:
8383
return false
84+
default:
85+
panic("unknown change type")
8486
}
8587
}
8688
return true
8789
}
88-
89-
func (c *Client) autoMigrateTable(ctx context.Context, table *arrow.Schema, changes []schema.FieldChange) error {
90+
func (c *Client) autoMigrateTable(ctx context.Context, table *schema.Table, changes []schema.TableColumnChange) error {
9091
for _, change := range changes {
9192
if change.Type == schema.TableColumnChangeTypeAdd {
92-
err := c.addColumn(ctx, table, change.Current)
93-
if err != nil {
93+
if err := c.addColumn(ctx, table, table.Columns.Get(change.ColumnName)); err != nil {
9494
return err
9595
}
9696
}
9797
}
98-
9998
return nil
10099
}
101100

102101
// Migrate relies on the CLI/client to lock before running migration.
103-
func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
104-
schemaTables, err := c.schemaTables(ctx, tables)
102+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
103+
mysqlTables, err := c.schemaTables(ctx, tables)
105104
if err != nil {
106105
return err
107106
}
108107

109-
normalizedTables, err := c.normalizeSchemas(tables)
108+
normalizedTables, err := c.normalizeTables(tables)
110109
if err != nil {
111110
return err
112111
}
113112

114113
if c.spec.MigrateMode != specs.MigrateModeForced {
115-
nonAutoMigrtableTables, changes := c.nonAutoMigrtableTables(normalizedTables, schemaTables)
114+
nonAutoMigrtableTables, changes := c.nonAutoMigratableTables(normalizedTables, mysqlTables)
116115
if len(nonAutoMigrtableTables) > 0 {
117116
return fmt.Errorf("tables %s with changes %v require force migration. use 'migrate_mode: forced'", strings.Join(nonAutoMigrtableTables, ","), changes)
118117
}
119118
}
120119

121120
for _, table := range normalizedTables {
122-
tableName := schema.TableName(table)
123-
if tableName == "" {
124-
return fmt.Errorf("schema %s has no table name", table.String())
125-
}
126-
c.logger.Info().Str("table", tableName).Msg("Migrating table")
127-
if len(table.Fields()) == 0 {
128-
c.logger.Info().Str("table", tableName).Msg("Table with no columns, skipping")
121+
c.logger.Info().Str("table", table.Name).Msg("Migrating table")
122+
if len(table.Columns) == 0 {
123+
c.logger.Info().Str("table", table.Name).Msg("Table with no columns, skipping")
129124
continue
130125
}
131-
schemaTable := schemaTables.SchemaByName(tableName)
132-
if schemaTable == nil {
133-
c.logger.Info().Str("table", tableName).Msg("Table doesn't exist, creating")
126+
mysql := mysqlTables.Get(table.Name)
127+
if mysql == nil {
128+
c.logger.Debug().Str("table", table.Name).Msg("Table doesn't exist, creating")
134129
if err := c.createTable(ctx, table); err != nil {
135130
return err
136131
}
137-
continue
138-
}
139-
140-
changes := schema.GetSchemaChanges(table, schemaTable)
141-
if c.canAutoMigrate(changes) {
142-
c.logger.Info().Str("table", tableName).Msg("Table exists, auto-migrating")
143-
if err := c.autoMigrateTable(ctx, table, changes); err != nil {
144-
return err
132+
} else {
133+
changes := table.GetChanges(mysql)
134+
if c.canAutoMigrate(changes) {
135+
c.logger.Info().Str("table", table.Name).Msg("Table exists, auto-migrating")
136+
if err := c.autoMigrateTable(ctx, table, changes); err != nil {
137+
return err
138+
}
139+
} else {
140+
c.logger.Info().Str("table", table.Name).Msg("Table exists, force migration required")
141+
if err := c.recreateTable(ctx, table); err != nil {
142+
return err
143+
}
145144
}
146-
continue
147-
}
148-
149-
c.logger.Info().Str("table", tableName).Msg("Table exists, force migration required")
150-
if err := c.recreateTable(ctx, table); err != nil {
151-
return err
152145
}
153146
}
154147

plugins/destination/mysql/client/read.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ import (
1010
"github.com/apache/arrow/go/v13/arrow"
1111
"github.com/apache/arrow/go/v13/arrow/array"
1212
"github.com/apache/arrow/go/v13/arrow/memory"
13-
"github.com/cloudquery/plugin-sdk/v2/schema"
14-
"github.com/cloudquery/plugin-sdk/v2/types"
13+
"github.com/cloudquery/plugin-sdk/v3/schema"
14+
"github.com/cloudquery/plugin-sdk/v3/types"
1515
"github.com/google/uuid"
1616
)
1717

18+
const (
19+
readSQL = `SELECT %s FROM %s WHERE _cq_source_name = ? order by _cq_sync_time asc`
20+
)
21+
1822
func (*Client) createResultsArray(table *arrow.Schema) []any {
1923
results := make([]any, 0, len(table.Fields()))
2024
for _, col := range table.Fields() {
@@ -75,7 +79,7 @@ func (*Client) createResultsArray(table *arrow.Schema) []any {
7579
func reverseTransform(table *arrow.Schema, values []any) (arrow.Record, error) {
7680
recordBuilder := array.NewRecordBuilder(memory.DefaultAllocator, table)
7781
for i, val := range values {
78-
switch table.Field(i).Type.(type) {
82+
switch fType := table.Field(i).Type.(type) {
7983
case *arrow.BooleanType:
8084
if val.(*sql.NullBool).Valid {
8185
recordBuilder.Field(i).(*array.BooleanBuilder).Append(val.(*sql.NullBool).Bool)
@@ -166,7 +170,7 @@ func reverseTransform(table *arrow.Schema, values []any) (arrow.Record, error) {
166170
} else {
167171
recordBuilder.Field(i).(*array.LargeStringBuilder).Append(val.(*sql.NullString).String)
168172
}
169-
case *arrow.BinaryType:
173+
case *arrow.BinaryType, *arrow.LargeBinaryType:
170174
if *val.(*[]byte) == nil {
171175
recordBuilder.Field(i).AppendNull()
172176
} else {
@@ -177,7 +181,22 @@ func reverseTransform(table *arrow.Schema, values []any) (arrow.Record, error) {
177181
if *asTime == nil {
178182
recordBuilder.Field(i).AppendNull()
179183
} else {
180-
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp((*asTime).UnixMicro()))
184+
switch recordBuilder.Field(i).Type().(*arrow.TimestampType).Unit {
185+
case arrow.Second:
186+
ts := (*asTime).Unix()
187+
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp((ts)))
188+
case arrow.Millisecond:
189+
ts := (*asTime).UnixMilli()
190+
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp((ts)))
191+
case arrow.Microsecond:
192+
ts := (*asTime).UnixMicro()
193+
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp((ts)))
194+
case arrow.Nanosecond:
195+
ts := (*asTime).UnixNano()
196+
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp((ts)))
197+
default:
198+
return nil, fmt.Errorf("unsupported timestamp unit %s", fType.Unit)
199+
}
181200
}
182201
case *types.UUIDType:
183202
if *val.(*[]byte) == nil {
@@ -204,29 +223,25 @@ func reverseTransform(table *arrow.Schema, values []any) (arrow.Record, error) {
204223
return rec, nil
205224
}
206225

207-
func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
208-
builder := strings.Builder{}
209-
builder.WriteString("SELECT")
210-
fields := table.Fields()
211-
for i, col := range fields {
212-
builder.WriteString(" " + identifier(col.Name))
213-
if i != len(fields)-1 {
214-
builder.WriteString(", ")
215-
}
226+
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
227+
colNames := make([]string, len(table.Columns))
228+
for i, col := range table.Columns {
229+
colNames[i] = identifier(col.Name)
216230
}
217-
tableName := schema.TableName(table)
218-
builder.WriteString("FROM " + identifier(tableName) + " WHERE _cq_source_name = ? ORDER BY _cq_sync_time ASC")
219-
rows, err := c.db.QueryContext(ctx, builder.String(), sourceName)
231+
cols := strings.Join(colNames, ", ")
232+
read := fmt.Sprintf(readSQL, cols, table.Name)
233+
rows, err := c.db.QueryContext(ctx, read, sourceName)
220234
if err != nil {
221235
return err
222236
}
223237
defer rows.Close()
238+
arrowSchemaTable := table.ToArrowSchema()
224239
for rows.Next() {
225-
values := c.createResultsArray(table)
240+
values := c.createResultsArray(arrowSchemaTable)
226241
if err := rows.Scan(values...); err != nil {
227-
return fmt.Errorf("failed to read from table %s: %w", tableName, err)
242+
return fmt.Errorf("failed to read from table %s: %w", table.Name, err)
228243
}
229-
record, err := reverseTransform(table, values)
244+
record, err := reverseTransform(arrowSchemaTable, values)
230245
if err != nil {
231246
return err
232247
}

0 commit comments

Comments
 (0)