Skip to content

Commit f7b5329

Browse files
authored
feat(mongodb): Upgrade to SDK V3 with native arrow support (#10819)
Closes #10723 Blocked by cloudquery/plugin-sdk#893 . And having weird issues with JSON being nonequal in the tests that still can't figure out
1 parent a862837 commit f7b5329

9 files changed

Lines changed: 88 additions & 59 deletions

File tree

plugins/destination/mongodb/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66

77
"github.com/cloudquery/plugin-pb-go/specs"
8-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
8+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
99
"github.com/rs/zerolog"
1010
"go.mongodb.org/mongo-driver/mongo"
1111
"go.mongodb.org/mongo-driver/mongo/options"

plugins/destination/mongodb/client/client_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package client
33
import (
44
"os"
55
"testing"
6+
"time"
67

78
"github.com/cloudquery/plugin-pb-go/specs"
8-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
9+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
910
)
1011

1112
var migrateStrategy = destination.MigrateStrategy{
@@ -41,5 +42,6 @@ func TestPlugin(t *testing.T) {
4142

4243
MigrateStrategyOverwrite: migrateStrategy,
4344
MigrateStrategyAppend: migrateStrategy,
44-
})
45+
},
46+
destination.WithTestSourceTimePrecision(time.Millisecond))
4547
}

plugins/destination/mongodb/client/deletestale.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"context"
55
"time"
66

7-
"github.com/cloudquery/plugin-sdk/v2/schema"
7+
"github.com/cloudquery/plugin-sdk/v3/schema"
88
"go.mongodb.org/mongo-driver/bson"
99
)
1010

11-
func (c *Client) DeleteStale(ctx context.Context, tables schema.Schemas, source string, syncTime time.Time) error {
11+
func (c *Client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
1212
for _, table := range tables {
13-
tableName := schema.TableName(table)
13+
tableName := table.Name
1414
// delete all records that are not in the source and are older than syncTime
1515
if _, err := c.client.Database(c.pluginSpec.Database).Collection(tableName).DeleteMany(ctx, bson.M{
1616
schema.CqSourceNameColumn.Name: source,

plugins/destination/mongodb/client/migrate.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/apache/arrow/go/v13/arrow"
87
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
109
"go.mongodb.org/mongo-driver/bson"
1110
"go.mongodb.org/mongo-driver/mongo"
1211
"go.mongodb.org/mongo-driver/mongo/options"
1312
)
1413

15-
func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
14+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
1615
for _, t := range tables {
1716
if err := c.migrateTable(ctx, t); err != nil {
1817
return err
@@ -22,8 +21,8 @@ func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
2221
return nil
2322
}
2423

25-
func (c *Client) migrateTable(ctx context.Context, table *arrow.Schema) error {
26-
tableName := schema.TableName(table)
24+
func (c *Client) migrateTable(ctx context.Context, table *schema.Table) error {
25+
tableName := table.Name
2726
for _, mdl := range c.getIndexTemplates(table) {
2827
res, err := c.client.Database(c.pluginSpec.Database).Collection(tableName).Indexes().CreateOne(ctx, mdl)
2928
switch {
@@ -44,8 +43,8 @@ func (c *Client) migrateTable(ctx context.Context, table *arrow.Schema) error {
4443
return nil
4544
}
4645

47-
func (c *Client) migrateTableOnConflict(ctx context.Context, table *arrow.Schema, mdl mongo.IndexModel) error {
48-
tableName := schema.TableName(table)
46+
func (c *Client) migrateTableOnConflict(ctx context.Context, table *schema.Table, mdl mongo.IndexModel) error {
47+
tableName := table.Name
4948
if c.spec.MigrateMode != specs.MigrateModeForced {
5049
return fmt.Errorf("collection %s requires forced migration due to changes in unique indexes. use 'migrate_mode: forced'", tableName)
5150
}
@@ -61,15 +60,14 @@ func (c *Client) migrateTableOnConflict(ctx context.Context, table *arrow.Schema
6160
return nil
6261
}
6362

64-
func (c *Client) getIndexTemplates(table *arrow.Schema) []mongo.IndexModel {
63+
func (c *Client) getIndexTemplates(table *schema.Table) []mongo.IndexModel {
6564
var indexes []mongo.IndexModel
6665

67-
pks := schema.PrimaryKeyIndices(table)
66+
pks := table.PrimaryKeys()
6867
if len(pks) > 0 {
6968
indexCols := bson.D{}
70-
for _, colIndex := range pks {
71-
col := table.Field(colIndex).Name
72-
indexCols = append(indexCols, bson.E{Key: col, Value: 1})
69+
for _, name := range pks {
70+
indexCols = append(indexCols, bson.E{Key: name, Value: 1})
7371
}
7472

7573
pkIndexName := "cq_pk"

plugins/destination/mongodb/client/read.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package client
22

33
import (
4+
"bytes"
45
"context"
5-
"encoding/json"
66
"fmt"
77

8+
"github.com/goccy/go-json"
9+
810
"github.com/apache/arrow/go/v13/arrow"
911
"github.com/apache/arrow/go/v13/arrow/array"
1012
"github.com/apache/arrow/go/v13/arrow/memory"
11-
"github.com/cloudquery/plugin-sdk/v2/schema"
12-
"github.com/cloudquery/plugin-sdk/v2/types"
13+
"github.com/cloudquery/plugin-sdk/v3/schema"
14+
"github.com/cloudquery/plugin-sdk/v3/types"
1315
"go.mongodb.org/mongo-driver/bson"
1416
"go.mongodb.org/mongo-driver/bson/primitive"
1517
"go.mongodb.org/mongo-driver/mongo/options"
@@ -24,23 +26,23 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
2426
case *array.BooleanBuilder:
2527
b.Append(val.(bool))
2628
case *array.Int8Builder:
27-
b.Append(val.(int8))
29+
b.Append(int8(val.(int32)))
2830
case *array.Int16Builder:
29-
b.Append(val.(int16))
31+
b.Append(int16(val.(int32)))
3032
case *array.Int32Builder:
3133
b.Append(val.(int32))
3234
case *array.Int64Builder:
3335
b.Append(val.(int64))
3436
case *array.Uint8Builder:
35-
b.Append(val.(uint8))
37+
b.Append(uint8(val.(int32)))
3638
case *array.Uint16Builder:
37-
b.Append(val.(uint16))
39+
b.Append(uint16(val.(int32)))
3840
case *array.Uint32Builder:
39-
b.Append(val.(uint32))
41+
b.Append(uint32(val.(int64)))
4042
case *array.Uint64Builder:
41-
b.Append(val.(uint64))
43+
b.Append(uint64(val.(int64)))
4244
case *array.Float32Builder:
43-
b.Append(val.(float32))
45+
b.Append(float32(val.(float64)))
4446
case *array.Float64Builder:
4547
b.Append(val.(float64))
4648
case *array.StringBuilder:
@@ -50,15 +52,27 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
5052
case *array.BinaryBuilder:
5153
b.Append(val.(primitive.Binary).Data)
5254
case *array.TimestampBuilder:
53-
b.Append(arrow.Timestamp((val).(primitive.DateTime).Time().UTC().UnixMicro()))
55+
switch b.Type().(*arrow.TimestampType).Unit {
56+
case arrow.Second:
57+
b.Append(arrow.Timestamp((val).(primitive.DateTime).Time().UTC().Unix()))
58+
case arrow.Millisecond:
59+
b.Append(arrow.Timestamp((val).(primitive.DateTime).Time().UTC().UnixMilli()))
60+
case arrow.Microsecond:
61+
b.Append(arrow.Timestamp((val).(primitive.DateTime).Time().UTC().UnixMicro()))
62+
case arrow.Nanosecond:
63+
b.Append(arrow.Timestamp((val).(primitive.DateTime).Time().UTC().UnixNano()))
64+
default:
65+
return fmt.Errorf("unsupported timestamp unit %s", f.Type.(*arrow.TimestampType).Unit)
66+
}
5467
case *types.JSONBuilder:
55-
b.Append(val.(primitive.M))
68+
b.Append(val)
5669
case *array.StructBuilder:
5770
v, err := json.Marshal(val.(primitive.M))
5871
if err != nil {
5972
return err
6073
}
61-
if err := b.UnmarshalJSON(v); err != nil {
74+
dec := json.NewDecoder(bytes.NewReader(v))
75+
if err := b.UnmarshalOne(dec); err != nil {
6276
return err
6377
}
6478
case array.ListLikeBuilder:
@@ -81,7 +95,8 @@ func (c *Client) reverseTransform(f arrow.Field, bldr array.Builder, val any) er
8195
return nil
8296
}
8397

84-
func (c *Client) reverseTransformer(sc *arrow.Schema, values primitive.M) (arrow.Record, error) {
98+
func (c *Client) reverseTransformer(table *schema.Table, values primitive.M) (arrow.Record, error) {
99+
sc := table.ToArrowSchema()
85100
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
86101
for i, f := range sc.Fields() {
87102
if err := c.reverseTransform(f, bldr.Field(i), values[sc.Field(i).Name]); err != nil {
@@ -92,8 +107,8 @@ func (c *Client) reverseTransformer(sc *arrow.Schema, values primitive.M) (arrow
92107
return rec, nil
93108
}
94109

95-
func (c *Client) Read(ctx context.Context, table *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
96-
tableName := schema.TableName(table)
110+
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
111+
tableName := table.Name
97112
cur, err := c.client.Database(c.pluginSpec.Database).Collection(tableName).Find(
98113
ctx,
99114
bson.M{"_cq_source_name": sourceName},

plugins/destination/mongodb/client/write.go

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,43 @@ package client
22

33
import (
44
"context"
5-
"encoding/json"
5+
6+
"github.com/goccy/go-json"
67

78
"github.com/apache/arrow/go/v13/arrow"
89
"github.com/apache/arrow/go/v13/arrow/array"
910
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v2/schema"
11-
"github.com/cloudquery/plugin-sdk/v2/types"
11+
"github.com/cloudquery/plugin-sdk/v3/schema"
12+
"github.com/cloudquery/plugin-sdk/v3/types"
1213
"go.mongodb.org/mongo-driver/bson"
1314
"go.mongodb.org/mongo-driver/mongo"
1415
)
1516

1617
func transformArr(arr arrow.Array) []any {
1718
dbArr := make([]any, arr.Len())
1819
for i := 0; i < arr.Len(); i++ {
19-
if arr.IsNull(i) || !arr.IsValid(i) {
20-
dbArr[i] = nil
20+
if arr.IsNull(i) {
2121
continue
2222
}
2323
switch a := arr.(type) {
2424
case *array.Boolean:
2525
dbArr[i] = a.Value(i)
26+
case *array.Int8:
27+
dbArr[i] = a.Value(i)
2628
case *array.Int16:
2729
dbArr[i] = a.Value(i)
2830
case *array.Int32:
2931
dbArr[i] = a.Value(i)
3032
case *array.Int64:
3133
dbArr[i] = a.Value(i)
34+
case *array.Uint8:
35+
dbArr[i] = a.Value(i)
36+
case *array.Uint16:
37+
dbArr[i] = a.Value(i)
38+
case *array.Uint32:
39+
dbArr[i] = a.Value(i)
40+
case *array.Uint64:
41+
dbArr[i] = a.Value(i)
3242
case *array.Float32:
3343
dbArr[i] = a.Value(i)
3444
case *array.Float64:
@@ -42,7 +52,7 @@ func transformArr(arr arrow.Array) []any {
4252
case *array.LargeString:
4353
dbArr[i] = a.Value(i)
4454
case *array.Timestamp:
45-
dbArr[i] = a.Value(i).ToTime(arrow.Microsecond)
55+
dbArr[i] = a.Value(i).ToTime(a.DataType().(*arrow.TimestampType).Unit)
4656
case *types.JSONArray:
4757
var val any
4858
if err := json.Unmarshal([]byte(a.ValueStr(i)), &val); err != nil {
@@ -67,7 +77,7 @@ func transformArr(arr arrow.Array) []any {
6777
return dbArr
6878
}
6979

70-
func (*Client) transformRecord(table *arrow.Schema, record arrow.Record) []any {
80+
func (*Client) transformRecord(table *schema.Table, record arrow.Record) []any {
7181
nc := int(record.NumCols())
7282
nr := int(record.NumRows())
7383
documents := make([]any, nr)
@@ -79,13 +89,13 @@ func (*Client) transformRecord(table *arrow.Schema, record arrow.Record) []any {
7989
col := record.Column(i)
8090
transformed := transformArr(col)
8191
for l := 0; l < nr; l++ {
82-
documents[l].(bson.M)[table.Field(i).Name] = transformed[l]
92+
documents[l].(bson.M)[table.Columns[i].Name] = transformed[l]
8393
}
8494
}
8595
return documents
8696
}
8797

88-
func (c *Client) transformRecords(table *arrow.Schema, records []arrow.Record) []any {
98+
func (c *Client) transformRecords(table *schema.Table, records []arrow.Record) []any {
8999
documents := make([]any, 0, len(records))
90100
for _, r := range records {
91101
docs := c.transformRecord(table, r)
@@ -94,28 +104,28 @@ func (c *Client) transformRecords(table *arrow.Schema, records []arrow.Record) [
94104
return documents
95105
}
96106

97-
func (c *Client) appendTableBatch(ctx context.Context, table *arrow.Schema, docuemnts []any) error {
98-
tableName := schema.TableName(table)
107+
func (c *Client) appendTableBatch(ctx context.Context, table *schema.Table, docuemnts []any) error {
108+
tableName := table.Name
99109
if _, err := c.client.Database(c.pluginSpec.Database).Collection(tableName).InsertMany(ctx, docuemnts); err != nil {
100110
return err
101111
}
102112
return nil
103113
}
104114

105-
func (c *Client) overwriteTableBatch(ctx context.Context, table *arrow.Schema, documents []any) error {
106-
tableName := schema.TableName(table)
115+
func (c *Client) overwriteTableBatch(ctx context.Context, table *schema.Table, documents []any) error {
116+
tableName := table.Name
107117
operations := make([]mongo.WriteModel, len(documents))
108-
pks := schema.PrimaryKeyIndices(table)
118+
pks := table.PrimaryKeys()
109119
for i, document := range documents {
110120
operation := mongo.NewUpdateOneModel()
111121
operation.SetUpsert(true)
112122
filter := make(bson.M, len(pks))
113-
for _, pk := range pks {
114-
filter[table.Field(pk).Name] = document.(bson.M)[table.Field(pk).Name]
123+
for _, name := range pks {
124+
filter[name] = document.(bson.M)[name]
115125
}
116126
operation.SetFilter(filter)
117-
update := make(bson.M, len(table.Fields()))
118-
for _, col := range table.Fields() {
127+
update := make(bson.M, len(table.Columns))
128+
for _, col := range table.Columns {
119129
update[col.Name] = document.(bson.M)[col.Name]
120130
}
121131
operation.SetUpdate(bson.M{"$set": update})
@@ -128,11 +138,10 @@ func (c *Client) overwriteTableBatch(ctx context.Context, table *arrow.Schema, d
128138
return nil
129139
}
130140

131-
func (c *Client) WriteTableBatch(ctx context.Context, table *arrow.Schema, resources []arrow.Record) error {
141+
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, resources []arrow.Record) error {
132142
documents := c.transformRecords(table, resources)
133143

134-
pks := schema.PrimaryKeyIndices(table)
135-
if len(pks) == 0 {
144+
if len(table.PrimaryKeys()) == 0 {
136145
return c.appendTableBatch(ctx, table, documents)
137146
}
138147
switch c.spec.WriteMode {

plugins/destination/mongodb/go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ go 1.19
55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
77
github.com/cloudquery/plugin-pb-go v1.0.8
8-
github.com/cloudquery/plugin-sdk/v2 v2.7.0
8+
github.com/cloudquery/plugin-sdk/v3 v3.6.2
9+
github.com/goccy/go-json v0.9.11
910
github.com/rs/zerolog v1.29.0
1011
go.mongodb.org/mongo-driver v1.11.2
1112
)
@@ -16,10 +17,10 @@ replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13
1617
require (
1718
github.com/andybalholm/brotli v1.0.5 // indirect
1819
github.com/apache/thrift v0.16.0 // indirect
20+
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
1921
github.com/davecgh/go-spew v1.1.1 // indirect
2022
github.com/getsentry/sentry-go v0.20.0 // indirect
2123
github.com/ghodss/yaml v1.0.0 // indirect
22-
github.com/goccy/go-json v0.9.11 // indirect
2324
github.com/golang/protobuf v1.5.3 // indirect
2425
github.com/golang/snappy v0.0.4 // indirect
2526
github.com/google/flatbuffers v2.0.8+incompatible // indirect
@@ -35,6 +36,7 @@ require (
3536
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
3637
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
3738
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
39+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
3840
github.com/pkg/errors v0.9.1 // indirect
3941
github.com/pmezard/go-difflib v1.0.0 // indirect
4042
github.com/spf13/cast v1.5.0 // indirect

plugins/destination/mongodb/go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37
5050
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
5151
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
5252
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
53+
github.com/cloudquery/plugin-sdk/v3 v3.6.2 h1:2+6qbACyTExNkYo75AuQnaWZUedbkSSmT403lwmJlHc=
54+
github.com/cloudquery/plugin-sdk/v3 v3.6.2/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
5355
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5456
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
5557
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@@ -179,6 +181,7 @@ github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6f
179181
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
180182
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
181183
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
184+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
182185
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
183186
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
184187
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

0 commit comments

Comments
 (0)