@@ -2,33 +2,43 @@ package client
22
33import (
44 "context"
5- "encoding/json"
5+
6+ "github.com/goccy/go-json"
67
78 "github.com/apache/arrow/go/v13/arrow"
89 "github.com/apache/arrow/go/v13/arrow/array"
910 "github.com/cloudquery/plugin-pb-go/specs"
10- "github.com/cloudquery/plugin-sdk/v2 /schema"
11- "github.com/cloudquery/plugin-sdk/v2 /types"
11+ "github.com/cloudquery/plugin-sdk/v3 /schema"
12+ "github.com/cloudquery/plugin-sdk/v3 /types"
1213 "go.mongodb.org/mongo-driver/bson"
1314 "go.mongodb.org/mongo-driver/mongo"
1415)
1516
1617func transformArr (arr arrow.Array ) []any {
1718 dbArr := make ([]any , arr .Len ())
1819 for i := 0 ; i < arr .Len (); i ++ {
19- if arr .IsNull (i ) || ! arr .IsValid (i ) {
20- dbArr [i ] = nil
20+ if arr .IsNull (i ) {
2121 continue
2222 }
2323 switch a := arr .(type ) {
2424 case * array.Boolean :
2525 dbArr [i ] = a .Value (i )
26+ case * array.Int8 :
27+ dbArr [i ] = a .Value (i )
2628 case * array.Int16 :
2729 dbArr [i ] = a .Value (i )
2830 case * array.Int32 :
2931 dbArr [i ] = a .Value (i )
3032 case * array.Int64 :
3133 dbArr [i ] = a .Value (i )
34+ case * array.Uint8 :
35+ dbArr [i ] = a .Value (i )
36+ case * array.Uint16 :
37+ dbArr [i ] = a .Value (i )
38+ case * array.Uint32 :
39+ dbArr [i ] = a .Value (i )
40+ case * array.Uint64 :
41+ dbArr [i ] = a .Value (i )
3242 case * array.Float32 :
3343 dbArr [i ] = a .Value (i )
3444 case * array.Float64 :
@@ -42,7 +52,7 @@ func transformArr(arr arrow.Array) []any {
4252 case * array.LargeString :
4353 dbArr [i ] = a .Value (i )
4454 case * array.Timestamp :
45- dbArr [i ] = a .Value (i ).ToTime (arrow .Microsecond )
55+ dbArr [i ] = a .Value (i ).ToTime (a . DataType ().( * arrow.TimestampType ). Unit )
4656 case * types.JSONArray :
4757 var val any
4858 if err := json .Unmarshal ([]byte (a .ValueStr (i )), & val ); err != nil {
@@ -67,7 +77,7 @@ func transformArr(arr arrow.Array) []any {
6777 return dbArr
6878}
6979
70- func (* Client ) transformRecord (table * arrow. Schema , record arrow.Record ) []any {
80+ func (* Client ) transformRecord (table * schema. Table , record arrow.Record ) []any {
7181 nc := int (record .NumCols ())
7282 nr := int (record .NumRows ())
7383 documents := make ([]any , nr )
@@ -79,13 +89,13 @@ func (*Client) transformRecord(table *arrow.Schema, record arrow.Record) []any {
7989 col := record .Column (i )
8090 transformed := transformArr (col )
8191 for l := 0 ; l < nr ; l ++ {
82- documents [l ].(bson.M )[table .Field ( i ) .Name ] = transformed [l ]
92+ documents [l ].(bson.M )[table .Columns [ i ] .Name ] = transformed [l ]
8393 }
8494 }
8595 return documents
8696}
8797
88- func (c * Client ) transformRecords (table * arrow. Schema , records []arrow.Record ) []any {
98+ func (c * Client ) transformRecords (table * schema. Table , records []arrow.Record ) []any {
8999 documents := make ([]any , 0 , len (records ))
90100 for _ , r := range records {
91101 docs := c .transformRecord (table , r )
@@ -94,28 +104,28 @@ func (c *Client) transformRecords(table *arrow.Schema, records []arrow.Record) [
94104 return documents
95105}
96106
97- func (c * Client ) appendTableBatch (ctx context.Context , table * arrow. Schema , docuemnts []any ) error {
98- tableName := schema . TableName ( table )
107+ func (c * Client ) appendTableBatch (ctx context.Context , table * schema. Table , docuemnts []any ) error {
108+ tableName := table . Name
99109 if _ , err := c .client .Database (c .pluginSpec .Database ).Collection (tableName ).InsertMany (ctx , docuemnts ); err != nil {
100110 return err
101111 }
102112 return nil
103113}
104114
105- func (c * Client ) overwriteTableBatch (ctx context.Context , table * arrow. Schema , documents []any ) error {
106- tableName := schema . TableName ( table )
115+ func (c * Client ) overwriteTableBatch (ctx context.Context , table * schema. Table , documents []any ) error {
116+ tableName := table . Name
107117 operations := make ([]mongo.WriteModel , len (documents ))
108- pks := schema . PrimaryKeyIndices ( table )
118+ pks := table . PrimaryKeys ( )
109119 for i , document := range documents {
110120 operation := mongo .NewUpdateOneModel ()
111121 operation .SetUpsert (true )
112122 filter := make (bson.M , len (pks ))
113- for _ , pk := range pks {
114- filter [table . Field ( pk ). Name ] = document .(bson.M )[table . Field ( pk ). Name ]
123+ for _ , name := range pks {
124+ filter [name ] = document .(bson.M )[name ]
115125 }
116126 operation .SetFilter (filter )
117- update := make (bson.M , len (table .Fields () ))
118- for _ , col := range table .Fields () {
127+ update := make (bson.M , len (table .Columns ))
128+ for _ , col := range table .Columns {
119129 update [col .Name ] = document .(bson.M )[col .Name ]
120130 }
121131 operation .SetUpdate (bson.M {"$set" : update })
@@ -128,11 +138,10 @@ func (c *Client) overwriteTableBatch(ctx context.Context, table *arrow.Schema, d
128138 return nil
129139}
130140
131- func (c * Client ) WriteTableBatch (ctx context.Context , table * arrow. Schema , resources []arrow.Record ) error {
141+ func (c * Client ) WriteTableBatch (ctx context.Context , table * schema. Table , resources []arrow.Record ) error {
132142 documents := c .transformRecords (table , resources )
133143
134- pks := schema .PrimaryKeyIndices (table )
135- if len (pks ) == 0 {
144+ if len (table .PrimaryKeys ()) == 0 {
136145 return c .appendTableBatch (ctx , table , documents )
137146 }
138147 switch c .spec .WriteMode {
0 commit comments