Skip to content

Commit 9b5dfe0

Browse files
feat(snowflake): Migrate to SDK V3 native arrow (#10822)
Closes #10729 --------- Co-authored-by: Alex Shcherbakov <candiduslynx@users.noreply.github.com>
1 parent d2d9015 commit 9b5dfe0

13 files changed

Lines changed: 103 additions & 52 deletions

File tree

plugins/destination/snowflake/CHANGELOG.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
### Bug Fixes
1515

1616
* **deps:** Update github.com/apache/arrow/go/v12 digest to 0ea1a10 ([#10461](https://github.com/cloudquery/cloudquery/issues/10461)) ([022709f](https://github.com/cloudquery/cloudquery/commit/022709f710cc6d95aee60260d6f58991698bbf42))
17-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.5.0 ([#10390](https://github.com/cloudquery/cloudquery/issues/10390)) ([f706688](https://github.com/cloudquery/cloudquery/commit/f706688b2f5b8393d09d57020d31fb1d280f0dbd))
18-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.5.1 ([#10448](https://github.com/cloudquery/cloudquery/issues/10448)) ([cc85b93](https://github.com/cloudquery/cloudquery/commit/cc85b939fe945939caf72f8c08095e1e744b9ee8))
17+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.5.0 ([#10390](https://github.com/cloudquery/cloudquery/issues/10390)) ([f706688](https://github.com/cloudquery/cloudquery/commit/f706688b2f5b8393d09d57020d31fb1d280f0dbd))
18+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.5.1 ([#10448](https://github.com/cloudquery/cloudquery/issues/10448)) ([cc85b93](https://github.com/cloudquery/cloudquery/commit/cc85b939fe945939caf72f8c08095e1e744b9ee8))
1919

2020
## [2.0.1](https://github.com/cloudquery/cloudquery/compare/plugins-destination-snowflake-v2.0.0...plugins-destination-snowflake-v2.0.1) (2023-04-25)
2121

2222

2323
### Bug Fixes
2424

2525
* **deps:** Update module github.com/aws/aws-sdk-go-v2/feature/s3/manager to v1.11.63 ([#10267](https://github.com/cloudquery/cloudquery/issues/10267)) ([7a8a4c7](https://github.com/cloudquery/cloudquery/commit/7a8a4c787bf2849b799014f51d32bec42942d16d))
26-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.3.5 ([#10200](https://github.com/cloudquery/cloudquery/issues/10200)) ([5a33693](https://github.com/cloudquery/cloudquery/commit/5a33693fe29f7068b03d80be1859d6e479c42c0d))
27-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.3.6 ([#10208](https://github.com/cloudquery/cloudquery/issues/10208)) ([91c80a7](https://github.com/cloudquery/cloudquery/commit/91c80a795b46480447cfaef67c4db721a31e3206))
28-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.4.0 ([#10278](https://github.com/cloudquery/cloudquery/issues/10278)) ([a0a713e](https://github.com/cloudquery/cloudquery/commit/a0a713e8490b970b9d8bfaa1b50e01f43ff51c36))
26+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.3.5 ([#10200](https://github.com/cloudquery/cloudquery/issues/10200)) ([5a33693](https://github.com/cloudquery/cloudquery/commit/5a33693fe29f7068b03d80be1859d6e479c42c0d))
27+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.3.6 ([#10208](https://github.com/cloudquery/cloudquery/issues/10208)) ([91c80a7](https://github.com/cloudquery/cloudquery/commit/91c80a795b46480447cfaef67c4db721a31e3206))
28+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.4.0 ([#10278](https://github.com/cloudquery/cloudquery/issues/10278)) ([a0a713e](https://github.com/cloudquery/cloudquery/commit/a0a713e8490b970b9d8bfaa1b50e01f43ff51c36))
2929
* Update to SDK v2.3.8, remove release calls ([#10263](https://github.com/cloudquery/cloudquery/issues/10263)) ([6783806](https://github.com/cloudquery/cloudquery/commit/678380691ca6afe7b40175e559d7160f7561d925))
3030

3131
## [2.0.0](https://github.com/cloudquery/cloudquery/compare/plugins-destination-snowflake-v1.1.18...plugins-destination-snowflake-v2.0.0) (2023-04-19)
@@ -48,7 +48,7 @@
4848
* **deps:** Update module github.com/aws/aws-sdk-go-v2/feature/s3/manager to v1.11.61 ([#9791](https://github.com/cloudquery/cloudquery/issues/9791)) ([f9dcef8](https://github.com/cloudquery/cloudquery/commit/f9dcef81bb81da123b6820ef2c4b204325e64203))
4949
* **deps:** Update module github.com/aws/aws-sdk-go-v2/feature/s3/manager to v1.11.62 ([#10129](https://github.com/cloudquery/cloudquery/issues/10129)) ([13f8670](https://github.com/cloudquery/cloudquery/commit/13f867006cd17c92bc1b18022ab3a210266258d8))
5050
* **deps:** Update module github.com/cloudquery/plugin-sdk to v1.45.0 ([#9863](https://github.com/cloudquery/cloudquery/issues/9863)) ([2799d62](https://github.com/cloudquery/cloudquery/commit/2799d62518283ac304beecda9478f8f2db43cdc5))
51-
* **deps:** Update module github.com/cloudquery/plugin-sdk/v2 to v2.3.4 ([#10196](https://github.com/cloudquery/cloudquery/issues/10196)) ([c6d2f59](https://github.com/cloudquery/cloudquery/commit/c6d2f59c7d77177a351cb82ecdc381dec6aad30c))
51+
* **deps:** Update module github.com/cloudquery/plugin-sdk/v3 to v2.3.4 ([#10196](https://github.com/cloudquery/cloudquery/issues/10196)) ([c6d2f59](https://github.com/cloudquery/cloudquery/commit/c6d2f59c7d77177a351cb82ecdc381dec6aad30c))
5252

5353
## [1.1.18](https://github.com/cloudquery/cloudquery/compare/plugins-destination-snowflake-v1.1.17...plugins-destination-snowflake-v1.1.18) (2023-04-04)
5454

plugins/destination/snowflake/CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go run main.go serve
1111
## Testing
1212

1313
```bash
14+
# export SNOW_TEST_DSN="username:password@account_locator.europe-west4.gcp/testdb/public?warehouse=test
1415
make test
1516
```
1617

plugins/destination/snowflake/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"fmt"
77

88
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
9+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1010
"github.com/rs/zerolog"
1111

1212
"github.com/snowflakedb/gosnowflake"

plugins/destination/snowflake/client/client_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import (
55
"testing"
66

77
"github.com/cloudquery/plugin-pb-go/specs"
8-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
8+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
9+
"github.com/cloudquery/plugin-sdk/v3/schema"
910
)
1011

1112
func TestPlugin(t *testing.T) {
@@ -24,5 +25,7 @@ func TestPlugin(t *testing.T) {
2425
SkipMigrateOverwrite: true,
2526
SkipMigrateOverwriteForce: true,
2627
SkipMigrateAppendForce: true,
27-
})
28+
},
29+
schema.WithTestSourceSkipIntervals(),
30+
)
2831
}

plugins/destination/snowflake/client/deletestale.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"strings"
66
"time"
77

8-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
99
)
1010

11-
func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error {
11+
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
1212
for _, table := range tables {
13-
tableName := schema.TableName(table)
13+
tableName := table.Name
1414
var sb strings.Builder
1515
sb.WriteString("delete from ")
1616
sb.WriteString(tableName)

plugins/destination/snowflake/client/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package client
22

33
import (
4-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
4+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
55
)
66

77
func (c *Client) Metrics() destination.Metrics {

plugins/destination/snowflake/client/migrate.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ import (
55
"fmt"
66
"strings"
77

8-
"github.com/apache/arrow/go/v13/arrow"
9-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
109
)
1110

1211
const (
@@ -36,9 +35,9 @@ func (i *tableInfo) getColumn(name string) *columnInfo {
3635
}
3736

3837
// This is the responsibility of the CLI of the client to lock before running migration
39-
func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
38+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
4039
for _, table := range tables {
41-
tableName := schema.TableName(table)
40+
tableName := table.Name
4241
c.logger.Debug().Str("table", tableName).Msg("Migrating table")
4342
tableExist, err := c.isTableExistSQL(ctx, tableName)
4443
if err != nil {
@@ -67,15 +66,15 @@ func (c *Client) isTableExistSQL(_ context.Context, table string) (bool, error)
6766
return tableExist == 1, nil
6867
}
6968

70-
func (c *Client) autoMigrateTable(_ context.Context, table *arrow.Schema) error {
69+
func (c *Client) autoMigrateTable(_ context.Context, table *schema.Table) error {
7170
var err error
7271
var info *tableInfo
73-
tableName := schema.TableName(table)
72+
tableName := table.Name
7473
if info, err = c.getTableInfo(tableName); err != nil {
7574
return fmt.Errorf("failed to get table %s columns types: %w", tableName, err)
7675
}
7776

78-
for _, col := range table.Fields() {
77+
for _, col := range table.Columns {
7978
columnName := col.Name
8079
columnType := c.SchemaTypeToSnowflake(col.Type)
8180
snowflakeColumn := info.getColumn(columnName)
@@ -94,16 +93,16 @@ func (c *Client) autoMigrateTable(_ context.Context, table *arrow.Schema) error
9493
return nil
9594
}
9695

97-
func (c *Client) createTableIfNotExist(_ context.Context, table *arrow.Schema) error {
96+
func (c *Client) createTableIfNotExist(_ context.Context, table *schema.Table) error {
9897
var sb strings.Builder
9998
// TODO sanitize tablename
100-
tableName := schema.TableName(table)
99+
tableName := table.Name
101100
sb.WriteString("CREATE TABLE IF NOT EXISTS ")
102101
sb.WriteString(tableName)
103102
sb.WriteString(" (")
104-
totalColumns := len(table.Fields())
103+
totalColumns := len(table.Columns)
105104

106-
for i, col := range table.Fields() {
105+
for i, col := range table.Columns {
107106
sqlType := c.SchemaTypeToSnowflake(col.Type)
108107
// TODO: sanitize column name
109108
fieldDef := `"` + col.Name + `" ` + sqlType

plugins/destination/snowflake/client/read.go

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/apache/arrow/go/v13/arrow"
1111
"github.com/apache/arrow/go/v13/arrow/array"
1212
"github.com/apache/arrow/go/v13/arrow/memory"
13-
"github.com/cloudquery/plugin-sdk/v2/schema"
13+
"github.com/cloudquery/plugin-sdk/v3/schema"
1414
)
1515

1616
const (
@@ -24,67 +24,111 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
2424
}
2525
switch b := bldr.(type) {
2626
case *array.BooleanBuilder:
27-
b.Append(val.(bool))
27+
if boolVal, ok := val.(bool); ok {
28+
b.Append(boolVal)
29+
return nil
30+
}
31+
return b.AppendValueFromString(val.(string))
2832
case *array.Int8Builder:
2933
u, err := strconv.ParseInt(val.(string), 10, 8)
3034
if err != nil {
31-
return err
35+
return fmt.Errorf("failed to parse int8: %w", err)
3236
}
3337
b.Append(int8(u))
3438
case *array.Int16Builder:
3539
u, err := strconv.ParseInt(val.(string), 10, 16)
3640
if err != nil {
37-
return err
41+
return fmt.Errorf("failed to parse int16: %w", err)
3842
}
3943
b.Append(int16(u))
4044
case *array.Int32Builder:
4145
u, err := strconv.ParseInt(val.(string), 10, 32)
4246
if err != nil {
43-
return err
47+
return fmt.Errorf("failed to parse int32: %w", err)
4448
}
4549
b.Append(int32(u))
4650
case *array.Int64Builder:
4751
u, err := strconv.ParseInt(val.(string), 10, 64)
4852
if err != nil {
49-
return err
53+
return fmt.Errorf("failed to parse int64: %w", err)
5054
}
5155
b.Append(u)
5256
case *array.Uint8Builder:
5357
u, err := strconv.ParseUint(val.(string), 10, 8)
5458
if err != nil {
55-
return err
59+
return fmt.Errorf("failed to parse uint8: %w", err)
5660
}
5761
b.Append(uint8(u))
5862
case *array.Uint16Builder:
5963
u, err := strconv.ParseUint(val.(string), 10, 16)
6064
if err != nil {
61-
return err
65+
return fmt.Errorf("failed to parse uint16: %w", err)
6266
}
6367
b.Append(uint16(u))
6468
case *array.Uint32Builder:
6569
u, err := strconv.ParseUint(val.(string), 10, 32)
6670
if err != nil {
67-
return err
71+
return fmt.Errorf("failed to parse uint32: %w", err)
6872
}
6973
b.Append(uint32(u))
7074
case *array.Uint64Builder:
7175
u, err := strconv.ParseUint(val.(string), 10, 64)
7276
if err != nil {
73-
return err
77+
return fmt.Errorf("failed to parse uint64: %w", err)
7478
}
7579
b.Append(u)
7680
case *array.Float32Builder:
77-
b.Append(val.(float32))
81+
if floatVal, ok := val.(float64); ok {
82+
b.Append(float32(floatVal))
83+
} else {
84+
floatVal, err := strconv.ParseFloat(val.(string), 32)
85+
if err != nil {
86+
return fmt.Errorf("failed to parse float32: %w", err)
87+
}
88+
b.Append(float32(floatVal))
89+
}
7890
case *array.Float64Builder:
79-
b.Append(val.(float64))
91+
if floatVal, ok := val.(float64); ok {
92+
b.Append(floatVal)
93+
} else {
94+
floatVal, err := strconv.ParseFloat(val.(string), 64)
95+
if err != nil {
96+
return fmt.Errorf("failed to parse float64: %w", err)
97+
}
98+
b.Append(floatVal)
99+
}
80100
case *array.StringBuilder:
81101
b.Append(val.(string))
82102
case *array.LargeStringBuilder:
83103
b.Append(val.(string))
84104
case *array.BinaryBuilder:
85105
b.Append(val.([]uint8))
86106
case *array.TimestampBuilder:
87-
b.Append(arrow.Timestamp(val.(time.Time).UnixMicro()))
107+
var timeVal time.Time
108+
// nolint:revive
109+
if t, ok := val.(time.Time); ok {
110+
timeVal = t
111+
} else {
112+
t, err := arrow.TimestampFromString(val.(string), b.Type().(*arrow.TimestampType).Unit)
113+
if err != nil {
114+
return fmt.Errorf("failed to parse timestamp: %w", err)
115+
}
116+
b.Append(t)
117+
return nil
118+
}
119+
120+
switch b.Type().(*arrow.TimestampType).Unit {
121+
case arrow.Second:
122+
b.Append(arrow.Timestamp(timeVal.UTC().Unix()))
123+
case arrow.Millisecond:
124+
b.Append(arrow.Timestamp(timeVal.UTC().UnixMilli()))
125+
case arrow.Microsecond:
126+
b.Append(arrow.Timestamp(timeVal.UTC().UnixMicro()))
127+
case arrow.Nanosecond:
128+
b.Append(arrow.Timestamp(timeVal.UTC().UnixNano()))
129+
default:
130+
return fmt.Errorf("unsupported timestamp unit %s", f.Type.(*arrow.TimestampType).Unit)
131+
}
88132
case array.ListLikeBuilder:
89133
b.Append(true)
90134
valBuilder := b.ValueBuilder()
@@ -110,17 +154,18 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
110154
return fmt.Errorf("unsupported type %T with builder %T", val, bldr)
111155
}
112156
if err := bldr.AppendValueFromString(v); err != nil {
113-
return err
157+
return fmt.Errorf("failed to AppendValueFromString %s: %w", v, err)
114158
}
115159
}
116160
return nil
117161
}
118162

119-
func (c *Client) reverseTransformer(sc *arrow.Schema, values []any) (arrow.Record, error) {
163+
func (c *Client) reverseTransformer(table *schema.Table, values []any) (arrow.Record, error) {
164+
sc := table.ToArrowSchema()
120165
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
121166
for i, f := range sc.Fields() {
122167
if err := c.reverseTransform(f, bldr.Field(i), *values[i].(*any)); err != nil {
123-
return nil, err
168+
return nil, fmt.Errorf("failed to transform field %s: %w", f.Name, err)
124169
}
125170
}
126171
rec := bldr.NewRecord()
@@ -153,10 +198,10 @@ func snowflakeStrToArray(val string) []string {
153198
return strs
154199
}
155200

156-
func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
157-
tableName := schema.TableName(table)
158-
colNames := make([]string, 0, len(table.Fields()))
159-
for _, col := range table.Fields() {
201+
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
202+
tableName := table.Name
203+
colNames := make([]string, 0, len(table.Columns))
204+
for _, col := range table.Columns {
160205
colNames = append(colNames, `"`+col.Name+`"`)
161206
}
162207
cols := strings.Join(colNames, ", ")
@@ -167,7 +212,7 @@ func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName strin
167212
}
168213
defer rows.Close()
169214
for rows.Next() {
170-
values := make([]any, len(table.Fields()))
215+
values := make([]any, len(table.Columns))
171216
for i := range values {
172217
values[i] = new(any)
173218
}

plugins/destination/snowflake/client/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package client
22

33
import (
44
"github.com/apache/arrow/go/v13/arrow"
5-
"github.com/cloudquery/plugin-sdk/v2/types"
5+
"github.com/cloudquery/plugin-sdk/v3/types"
66
)
77

88
func (*Client) SchemaTypeToSnowflake(t arrow.DataType) string {

plugins/destination/snowflake/client/write.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
"github.com/apache/arrow/go/v13/arrow"
1111
"github.com/apache/arrow/go/v13/arrow/array"
12-
"github.com/cloudquery/plugin-sdk/v2/schema"
12+
"github.com/cloudquery/plugin-sdk/v3/schema"
1313
)
1414

1515
const (
@@ -19,8 +19,8 @@ const (
1919
copyIntoTable = `copy into %s from @cq_plugin_stage/%s file_format = (format_name = cq_plugin_json_format) match_by_column_name = case_insensitive`
2020
)
2121

22-
func (c *Client) WriteTableBatch(ctx context.Context, table *arrow.Schema, resources []arrow.Record) error {
23-
tableName := schema.TableName(table)
22+
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, resources []arrow.Record) error {
23+
tableName := table.Name
2424
f, err := os.CreateTemp(os.TempDir(), tableName+".json.*")
2525
if err != nil {
2626
return err

0 commit comments

Comments
 (0)