Skip to content

Commit 400e0e4

Browse files
authored
feat(kafka): Upgrade to v3 (#10931)
1 parent 95d9f14 commit 400e0e4

7 files changed

Lines changed: 24 additions & 22 deletions

File tree

plugins/destination/kafka/client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"fmt"
77
"time"
88

9-
"github.com/cloudquery/filetypes/v2"
9+
"github.com/cloudquery/filetypes/v3"
1010
"github.com/cloudquery/plugin-pb-go/specs"
1111
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1212

plugins/destination/kafka/client/client_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"strings"
66
"testing"
77

8-
"github.com/cloudquery/filetypes/v2"
8+
"github.com/cloudquery/filetypes/v3"
99
"github.com/cloudquery/plugin-pb-go/specs"
1010
"github.com/cloudquery/plugin-sdk/v3/plugins/destination"
1111
)
@@ -45,5 +45,8 @@ func TestPgPlugin(t *testing.T) {
4545
SkipMigrateOverwrite: true,
4646
SkipMigrateOverwriteForce: true,
4747
SkipMigrateAppendForce: true,
48-
})
48+
},
49+
destination.WithTestSourceSkipDates(),
50+
destination.WithTestSourceSkipTimes(),
51+
)
4952
}

plugins/destination/kafka/client/read.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName strin
3030
case <-ctx.Done():
3131
return ctx.Err()
3232
case msg := <-partitionConsumer.Messages():
33-
if err := c.Client.Read(bytes.NewReader(msg.Value), table.ToArrowSchema(), sourceName, res); err != nil {
33+
if err := c.Client.Read(bytes.NewReader(msg.Value), table, sourceName, res); err != nil {
3434
return err
3535
}
3636
case err := <-partitionConsumer.Errors():

plugins/destination/kafka/client/spec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package client
33
import (
44
"fmt"
55

6-
"github.com/cloudquery/filetypes/v2"
6+
"github.com/cloudquery/filetypes/v3"
77
)
88

99
type Spec struct {

plugins/destination/kafka/client/write.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7-
"fmt"
87
"strings"
98
"sync/atomic"
109

@@ -42,20 +41,20 @@ func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan arr
4241

4342
messages := make([]*sarama.ProducerMessage, 0, c.spec.BatchSize)
4443
for r := range res {
44+
table, err := schema.NewTableFromArrowSchema(r.Schema())
45+
if err != nil {
46+
return err
47+
}
48+
4549
var b bytes.Buffer
4650
w := bufio.NewWriter(&b)
47-
sc := r.Schema()
48-
tableName, ok := r.Schema().Metadata().GetValue(schema.MetadataTableName)
49-
if !ok {
50-
return fmt.Errorf("%q metadata key not found", schema.MetadataTableName)
51-
}
5251

53-
if err := c.Client.WriteTableBatchFile(w, sc, []arrow.Record{r}); err != nil {
52+
if err := c.Client.WriteTableBatchFile(w, table, []arrow.Record{r}); err != nil {
5453
return err
5554
}
5655
w.Flush()
5756
messages = append(messages, &sarama.ProducerMessage{
58-
Topic: tableName,
57+
Topic: table.Name,
5958
Key: nil,
6059
Value: sarama.ByteEncoder(b.Bytes()),
6160
})

plugins/destination/kafka/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ go 1.19
55
require (
66
github.com/Shopify/sarama v1.37.2
77
github.com/apache/arrow/go/v13 v13.0.0-20230509040948-de6c3cd2b604
8-
github.com/cloudquery/filetypes/v2 v2.1.0
8+
github.com/cloudquery/filetypes/v3 v3.0.1
99
github.com/cloudquery/plugin-pb-go v1.0.8
10-
github.com/cloudquery/plugin-sdk/v3 v3.6.2
10+
github.com/cloudquery/plugin-sdk/v3 v3.6.3
1111
github.com/rs/zerolog v1.29.1
1212
)
1313

1414
// TODO: remove once all updates are merged
15-
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8
15+
replace github.com/apache/arrow/go/v13 => github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa
1616

1717
require (
1818
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect

plugins/destination/kafka/go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
4848
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
4949
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
5050
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
51-
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8 h1:CmgLSEGQNLHpUQ5cU4L4aF7cuJZRnc1toIIWqC1gmPg=
52-
github.com/cloudquery/arrow/go/v13 v13.0.0-20230509053643-898a79b1d3c8/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
53-
github.com/cloudquery/filetypes/v2 v2.1.0 h1:J7JPL9zWt+WiTvrzuN216ff4/RY6we9Jaffv+o3uLPQ=
54-
github.com/cloudquery/filetypes/v2 v2.1.0/go.mod h1:qumYuBtw0dhpr6zAvlAaSjuPVtFxqG+tt1oPgu3td4E=
51+
github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa h1:6y3l+YgGqMJsx5TrxFHPjxDqZ5c3M9+r3dv+CYIRl44=
52+
github.com/cloudquery/arrow/go/v13 v13.0.0-20230521112802-adef07d4bbaa/go.mod h1:/XatdE3kDIBqZKhZ7OBUHwP2jaASDFZHqF4puOWM8po=
53+
github.com/cloudquery/filetypes/v3 v3.0.1 h1:XxUqYrSlzb4p0+BubW85JXMZOs00h8Ets3s2huZWG98=
54+
github.com/cloudquery/filetypes/v3 v3.0.1/go.mod h1:eGhVmBXMjjCnzZ0gqyBPfufqv+Q2DNttp9OFSY+IZGk=
5555
github.com/cloudquery/plugin-pb-go v1.0.8 h1:wn3GXhcNItcP+6wUUZuzUFbvdL59liKBO37/izMi+FQ=
5656
github.com/cloudquery/plugin-pb-go v1.0.8/go.mod h1:vAGA27psem7ZZNAY4a3S9TKuA/JDQWstjKcHPJX91Mc=
5757
github.com/cloudquery/plugin-sdk/v2 v2.7.0 h1:hRXsdEiaOxJtsn/wZMFQC9/jPfU1MeMK3KF+gPGqm7U=
5858
github.com/cloudquery/plugin-sdk/v2 v2.7.0/go.mod h1:pAX6ojIW99b/Vg4CkhnsGkRIzNaVEceYMR+Bdit73ug=
59-
github.com/cloudquery/plugin-sdk/v3 v3.6.2 h1:2+6qbACyTExNkYo75AuQnaWZUedbkSSmT403lwmJlHc=
60-
github.com/cloudquery/plugin-sdk/v3 v3.6.2/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
59+
github.com/cloudquery/plugin-sdk/v3 v3.6.3 h1:TyljGXffaPICARPBg8geOfKI4biP5sjW9OjSkjMXwig=
60+
github.com/cloudquery/plugin-sdk/v3 v3.6.3/go.mod h1:3JrZXEULmGXpkOukVaRIzaA63d7TJr9Ukp6hemTjbtc=
6161
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
6262
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
6363
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=

0 commit comments

Comments
 (0)