Skip to content

Commit 3ffb2af

Browse files
authored
feat(meilisearch): Migrate to github.com/cloudquery/plugin-sdk/v3 (#10808)
Closes #10722 ~Blocked by meilisearch/meilisearch-go#436
1 parent 84f3416 commit 3ffb2af

11 files changed

Lines changed: 56 additions & 53 deletions

File tree

plugins/destination/meilisearch/client/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import (
88
"time"
99

1010
"github.com/cloudquery/plugin-pb-go/specs"
11-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
12-
"github.com/cloudquery/plugin-sdk/v2/schema"
11+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
12+
"github.com/cloudquery/plugin-sdk/v3/schema"
1313
"github.com/meilisearch/meilisearch-go"
1414
"github.com/rs/zerolog"
1515
)
@@ -31,7 +31,7 @@ func (c *Client) Close(context.Context) error {
3131
return nil
3232
}
3333

34-
func (*Client) DeleteStale(context.Context, schema.Schemas, string, time.Time) error {
34+
func (*Client) DeleteStale(context.Context, schema.Tables, string, time.Time) error {
3535
return fmt.Errorf("DeleteStale not supported")
3636
}
3737

plugins/destination/meilisearch/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/cloudquery/cloudquery/plugins/destination/meilisearch/resources/plugin"
99
"github.com/cloudquery/plugin-pb-go/specs"
10-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
10+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1111
)
1212

1313
var migrateStrategy = destination.MigrateStrategy{

plugins/destination/meilisearch/client/hash.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,21 @@ import (
44
"crypto/sha256"
55
"fmt"
66

7-
"github.com/apache/arrow/go/v13/arrow"
8-
"github.com/cloudquery/plugin-sdk/v2/schema"
7+
"github.com/cloudquery/plugin-sdk/v3/schema"
98
"github.com/google/uuid"
109
)
1110

1211
const hashColumnName = "_cq_pk_hash_uuid"
1312

14-
func hashUUID(sc *arrow.Schema) func(map[string]any) string {
15-
pk := schema.PrimaryKeyIndices(sc)
13+
func hashUUID(table *schema.Table) func(map[string]any) string {
14+
pk := table.PrimaryKeys()
1615
if len(pk) == 0 {
1716
return func(map[string]any) string { return uuid.New().String() }
1817
}
1918

20-
names := make([]string, len(pk))
21-
for i, idx := range pk {
22-
names[i] = sc.Field(idx).Name
23-
}
24-
2519
return func(row map[string]any) string {
2620
h := sha256.New()
27-
for _, name := range names {
21+
for _, name := range pk {
2822
h.Write([]byte(name))
2923
h.Write([]byte(fmt.Sprint(row[name])))
3024
}

plugins/destination/meilisearch/client/index.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/apache/arrow/go/v13/arrow"
8-
"github.com/cloudquery/plugin-sdk/v2/schema"
7+
"github.com/cloudquery/plugin-sdk/v3/schema"
98
"github.com/meilisearch/meilisearch-go"
109
"golang.org/x/exp/slices"
1110
)
@@ -35,19 +34,15 @@ func (i *indexSchema) canMigrate(o *indexSchema) bool {
3534
return i.UID == o.UID && i.PrimaryKey == o.PrimaryKey
3635
}
3736

38-
func (c *Client) tableIndexSchema(sc *arrow.Schema) *indexSchema {
39-
attributes := make([]string, len(sc.Fields()))
40-
for i, fld := range sc.Fields() {
41-
attributes[i] = fld.Name
42-
}
37+
func (c *Client) tableIndexSchema(table *schema.Table) *indexSchema {
4338
return &indexSchema{
44-
UID: schema.TableName(sc),
39+
UID: table.Name,
4540
PrimaryKey: c.pkColumn,
46-
Attributes: attributes,
41+
Attributes: table.Columns.Names(),
4742
}
4843
}
4944

50-
func (c *Client) tablesIndexSchemas(tables schema.Schemas) map[string]*indexSchema {
45+
func (c *Client) tablesIndexSchemas(tables schema.Tables) map[string]*indexSchema {
5146
res := make(map[string]*indexSchema)
5247
for _, table := range tables {
5348
s := c.tableIndexSchema(table)

plugins/destination/meilisearch/client/migrate.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ import (
55
"fmt"
66

77
"github.com/cloudquery/plugin-pb-go/specs"
8-
"github.com/cloudquery/plugin-sdk/v2/schema"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
99
)
1010

11-
func (c *Client) Migrate(ctx context.Context, tables schema.Schemas) error {
11+
func (c *Client) Migrate(ctx context.Context, tables schema.Tables) error {
1212
c.logger.Info().Msg("Migrate")
1313

1414
have, err := c.indexes()
1515
if err != nil {
1616
return err
1717
}
1818

19-
want := c.tablesIndexSchemas(tables)
19+
want := c.tablesIndexSchemas(tables.FlattenTables())
2020

2121
var recreate, create, update []*indexSchema
2222
for uid, need := range want {

plugins/destination/meilisearch/client/read.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ import (
77
"github.com/apache/arrow/go/v13/arrow"
88
"github.com/apache/arrow/go/v13/arrow/array"
99
"github.com/apache/arrow/go/v13/arrow/memory"
10-
"github.com/cloudquery/plugin-sdk/v2/schema"
10+
"github.com/cloudquery/plugin-sdk/v3/schema"
1111
"github.com/meilisearch/meilisearch-go"
1212
)
1313

14-
func (c *Client) Read(_ context.Context, sc *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
15-
index, err := c.Meilisearch.GetIndex(schema.TableName(sc))
14+
func (c *Client) Read(_ context.Context, table *schema.Table, sourceName string, res chan<- arrow.Record) error {
15+
sc := table.ToArrowSchema()
16+
index, err := c.Meilisearch.GetIndex(table.Name)
1617
if err != nil {
1718
return err
1819
}

plugins/destination/meilisearch/client/transform.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package client
22

33
import (
4-
"fmt"
4+
"bytes"
55

66
"github.com/apache/arrow/go/v13/arrow"
77
"github.com/apache/arrow/go/v13/arrow/array"
8-
"github.com/cloudquery/plugin-sdk/v2/types"
8+
"github.com/cloudquery/plugin-sdk/v3/types"
9+
"github.com/goccy/go-json"
910
)
1011

1112
func timestampValues(arr *array.Timestamp) []any {
@@ -77,6 +78,12 @@ func reverseTransform(builder array.Builder, val any) error {
7778
builder.Append(float32(val.(float64)))
7879
case *array.Float64Builder:
7980
builder.Append(val.(float64))
81+
case *array.BinaryBuilder:
82+
return builder.AppendValueFromString(val.(string))
83+
case *array.StringBuilder:
84+
builder.Append(val.(string))
85+
case *array.LargeStringBuilder:
86+
builder.Append(val.(string))
8087
case *types.JSONBuilder:
8188
builder.Append(val)
8289
case array.ListLikeBuilder:
@@ -87,14 +94,18 @@ func reverseTransform(builder array.Builder, val any) error {
8794
return err
8895
}
8996
}
97+
9098
default:
91-
v, ok := val.(string)
92-
if !ok {
93-
return fmt.Errorf("unsupported type %T with builder %T", val, builder)
99+
data, err := json.MarshalWithOption(val, json.DisableHTMLEscape())
100+
if err != nil {
101+
return err
94102
}
95-
if err := builder.AppendValueFromString(v); err != nil {
103+
104+
dec := json.NewDecoder(bytes.NewReader(data))
105+
if err := builder.UnmarshalOne(dec); err != nil {
96106
return err
97107
}
98108
}
109+
99110
return nil
100111
}

plugins/destination/meilisearch/client/write.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ import (
66

77
"github.com/apache/arrow/go/v13/arrow"
88
"github.com/cloudquery/plugin-pb-go/specs"
9-
"github.com/cloudquery/plugin-sdk/v2/schema"
9+
"github.com/cloudquery/plugin-sdk/v3/schema"
1010
)
1111

12-
func (c *Client) WriteTableBatch(ctx context.Context, sc *arrow.Schema, records []arrow.Record) error {
13-
index, err := c.Meilisearch.GetIndex(schema.TableName(sc))
12+
func (c *Client) WriteTableBatch(ctx context.Context, table *schema.Table, records []arrow.Record) error {
13+
index, err := c.Meilisearch.GetIndex(table.Name)
1414
if err != nil {
1515
return err
1616
}
1717

1818
var transformer rowTransformer
1919
switch c.dstSpec.WriteMode {
2020
case specs.WriteModeAppend:
21-
transformer = toMap(sc)
21+
transformer = toMap(table)
2222
case specs.WriteModeOverwrite, specs.WriteModeOverwriteDeleteStale:
23-
transformer = toMapWithHash(sc)
23+
transformer = toMapWithHash(table)
2424
default:
2525
return fmt.Errorf("unsupported write mode %q", c.dstSpec.WriteMode.String())
2626
}
@@ -48,11 +48,8 @@ func (c *Client) WriteTableBatch(ctx context.Context, sc *arrow.Schema, records
4848

4949
type rowTransformer func(record arrow.Record) ([]map[string]any, error)
5050

51-
func toMap(sc *arrow.Schema) rowTransformer {
52-
columns := make([]string, len(sc.Fields()))
53-
for i, fld := range sc.Fields() {
54-
columns[i] = fld.Name
55-
}
51+
func toMap(table *schema.Table) rowTransformer {
52+
columns := table.Columns.Names()
5653
return func(record arrow.Record) ([]map[string]any, error) {
5754
byColumn := make(map[string][]any, len(columns))
5855
for i, col := range record.Columns() {
@@ -62,9 +59,9 @@ func toMap(sc *arrow.Schema) rowTransformer {
6259
}
6360
}
6461

65-
func toMapWithHash(sc *arrow.Schema) rowTransformer {
66-
m := toMap(sc)
67-
h := hashUUID(sc)
62+
func toMapWithHash(table *schema.Table) rowTransformer {
63+
m := toMap(table)
64+
h := hashUUID(table)
6865
return func(record arrow.Record) ([]map[string]any, error) {
6966
rows, err := m(record)
7067
if err != nil {

plugins/destination/meilisearch/go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ go 1.19
55
require (
66
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
77
github.com/cloudquery/plugin-pb-go v1.0.8
8-
github.com/cloudquery/plugin-sdk/v2 v2.7.0
8+
github.com/cloudquery/plugin-sdk/v3 v3.5.1
9+
github.com/goccy/go-json v0.9.11
910
github.com/google/uuid v1.3.0
1011
github.com/meilisearch/meilisearch-go v0.24.0
1112
github.com/rs/zerolog v1.29.0
@@ -19,10 +20,10 @@ replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13
1920
require (
2021
github.com/andybalholm/brotli v1.0.5 // indirect
2122
github.com/apache/thrift v0.16.0 // indirect
23+
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
2224
github.com/davecgh/go-spew v1.1.1 // indirect
2325
github.com/getsentry/sentry-go v0.20.0 // indirect
2426
github.com/ghodss/yaml v1.0.0 // indirect
25-
github.com/goccy/go-json v0.9.11 // indirect
2627
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
2728
github.com/golang/protobuf v1.5.3 // indirect
2829
github.com/golang/snappy v0.0.4 // indirect
@@ -39,6 +40,7 @@ require (
3940
github.com/mattn/go-isatty v0.0.18 // indirect
4041
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
4142
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
43+
github.com/pierrec/lz4/v4 v4.1.15 // indirect
4244
github.com/pmezard/go-difflib v1.0.0 // indirect
4345
github.com/spf13/cast v1.5.0 // indirect
4446
github.com/spf13/cobra v1.6.1 // indirect

plugins/destination/meilisearch/go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37
5151
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
5252
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
5353
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
54+
github.com/cloudquery/plugin-sdk/v3 v3.5.1 h1:797hWUEsojwvp7xtr6LSaf5tk5iG9UDixoRACxu3xrU=
55+
github.com/cloudquery/plugin-sdk/v3 v3.5.1/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
5456
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
5557
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
5658
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
@@ -185,6 +187,7 @@ github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8D
185187
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
186188
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
187189
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
190+
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
188191
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
189192
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
190193
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

0 commit comments

Comments
 (0)