Skip to content

Commit 99449c4

Browse files
authored
feat(firehose): Update to github.com/cloudquery/plugin-sdk/v3 (#10895)
Closes #10718
1 parent 7476cc3 commit 99449c4

12 files changed

Lines changed: 46 additions & 35 deletions

File tree

plugins/destination/firehose/client/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"github.com/aws/aws-sdk-go-v2/service/firehose"
1010

1111
"github.com/cloudquery/plugin-pb-go/specs"
12-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
12+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1313
"github.com/rs/zerolog"
1414
)
1515

@@ -22,6 +22,8 @@ type Client struct {
2222
firehoseClient *firehose.Client
2323
}
2424

25+
var _ destination.Client = (*Client)(nil)
26+
2527
func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (destination.Client, error) {
2628
if spec.WriteMode != specs.WriteModeAppend {
2729
return nil, fmt.Errorf("destination only supports append mode")

plugins/destination/firehose/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"testing"
55

66
"github.com/cloudquery/plugin-pb-go/specs"
7-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
7+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
88
)
99

1010
const streamARN = "cq-playground-test"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
9+
)
10+
11+
func (*Client) DeleteStale(context.Context, schema.Tables, string, time.Time) error {
12+
return fmt.Errorf("delete-stale is not implemented")
13+
}

plugins/destination/firehose/client/deletestale.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

plugins/destination/firehose/client/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package client
22

33
import (
4-
"github.com/cloudquery/plugin-sdk/v2/plugins/destination"
4+
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
55
)
66

77
func (c *Client) Metrics() destination.Metrics {

plugins/destination/firehose/client/migrate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package client
33
import (
44
"context"
55

6-
"github.com/cloudquery/plugin-sdk/v2/schema"
6+
"github.com/cloudquery/plugin-sdk/v3/schema"
77
)
88

9-
func (*Client) Migrate(ctx context.Context, schemas schema.Schemas) error {
9+
func (*Client) Migrate(context.Context, schema.Tables) error {
1010
// migrate is not needed in append mode
1111
return nil
1212
}

plugins/destination/firehose/client/read.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/apache/arrow/go/v13/arrow"
8+
"github.com/cloudquery/plugin-sdk/v3/schema"
79
)
810

9-
func (*Client) Read(ctx context.Context, arrowSchema *arrow.Schema, sourceName string, res chan<- arrow.Record) error {
10-
panic("not implemented")
11+
func (*Client) Read(context.Context, *schema.Table, string, chan<- arrow.Record) error {
12+
return fmt.Errorf("read is not implemented")
1113
}

plugins/destination/firehose/client/write.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ import (
1414
"github.com/aws/aws-sdk-go-v2/aws/arn"
1515
"github.com/aws/aws-sdk-go-v2/service/firehose"
1616
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
17-
"github.com/cloudquery/plugin-sdk/v2/schema"
17+
"github.com/cloudquery/plugin-sdk/v3/schema"
1818
)
1919

20-
const MaxRecordSizeBytes = 1024000
21-
const MaxBatchRecords = 500
22-
const MaxBatchSizeBytes = 4194000
20+
const (
21+
MaxRecordSizeBytes = 1024000
22+
MaxBatchRecords = 500
23+
MaxBatchSizeBytes = 4194000
24+
)
2325

24-
func (c *Client) Write(ctx context.Context, tables schema.Schemas, record <-chan arrow.Record) error {
26+
func (c *Client) Write(ctx context.Context, tables schema.Tables, record <-chan arrow.Record) error {
2527
parsedARN, err := arn.Parse(c.pluginSpec.StreamARN)
2628
if err != nil {
2729
c.logger.Error().Err(err).Msg("invalid firehose stream ARN")
@@ -38,10 +40,14 @@ func (c *Client) Write(ctx context.Context, tables schema.Schemas, record <-chan
3840
batchSize := 0
3941

4042
for rec := range record {
41-
tableName := schema.TableName(rec.Schema())
42-
table := tables.SchemaByName(tableName)
43+
tableName, ok := rec.Schema().Metadata().GetValue(schema.MetadataTableName)
44+
if !ok {
45+
return fmt.Errorf("%q metadata key not found", schema.MetadataTableName)
46+
}
47+
48+
table := tables.Get(tableName)
4349
if table == nil {
44-
panic(fmt.Errorf("table %s not found", tableName))
50+
return fmt.Errorf("table %s not found", tableName)
4551
}
4652

4753
for row := int64(0); row < rec.NumRows(); row++ {

plugins/destination/firehose/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/aws/aws-sdk-go-v2/config v1.18.25
99
github.com/aws/aws-sdk-go-v2/service/firehose v1.16.8
1010
github.com/cloudquery/plugin-pb-go v1.0.8
11-
github.com/cloudquery/plugin-sdk/v2 v2.7.0
11+
github.com/cloudquery/plugin-sdk/v3 v3.6.2
1212
github.com/rs/zerolog v1.29.0
1313
)
1414

@@ -28,6 +28,7 @@ require (
2828
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.10 // indirect
2929
github.com/aws/aws-sdk-go-v2/service/sts v1.19.0 // indirect
3030
github.com/aws/smithy-go v1.13.5 // indirect
31+
github.com/cloudquery/plugin-sdk/v2 v2.7.0 // indirect
3132
github.com/davecgh/go-spew v1.1.1 // indirect
3233
github.com/getsentry/sentry-go v0.20.0 // indirect
3334
github.com/ghodss/yaml v1.0.0 // indirect

plugins/destination/firehose/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37
7979
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
8080
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
8181
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
82+
github.com/cloudquery/plugin-sdk/v3 v3.6.2 h1:2+6qbACyTExNkYo75AuQnaWZUedbkSSmT403lwmJlHc=
83+
github.com/cloudquery/plugin-sdk/v3 v3.6.2/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
8284
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
8385
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
8486
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

0 commit comments

Comments
 (0)