Skip to content

Commit 5e4fd5d

Browse files
authored
feat: Update destination to more managed SDK (#3842)
This should go after this is merged - cloudquery/plugin-sdk#369
1 parent fec9091 commit 5e4fd5d

File tree

15 files changed

+298
-297
lines changed

15 files changed

+298
-297
lines changed

plugins/destination/csv/client/client.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,23 @@ type readMsg struct {
3232
table *schema.Table
3333
source string
3434
err chan error
35-
resources chan *schema.DestinationResource
35+
resources chan []interface{}
3636
}
3737

3838
type closeMsg struct {
3939
err chan error
4040
}
4141

4242
type Client struct {
43+
plugins.DefaultReverseTransformer
4344
logger zerolog.Logger
4445
spec specs.Destination
4546
csvSpec Spec
4647
metrics plugins.DestinationMetrics
4748
writers map[string]*tableWriter
4849

4950
startWriteChan chan *startWriteMsg
50-
writeChan chan *schema.DestinationResource
51+
writeChan chan *plugins.ClientResource
5152
endWriteChan chan *endWriteMsg
5253
migrateChan chan *migrateMsg
5354
readChan chan *readMsg
@@ -71,7 +72,7 @@ func New(ctx context.Context, logger zerolog.Logger, spec specs.Destination) (pl
7172
logger: logger.With().Str("module", "csv-dest").Logger(),
7273
spec: spec,
7374
startWriteChan: make(chan *startWriteMsg),
74-
writeChan: make(chan *schema.DestinationResource),
75+
writeChan: make(chan *plugins.ClientResource),
7576
endWriteChan: make(chan *endWriteMsg),
7677
migrateChan: make(chan *migrateMsg),
7778
readChan: make(chan *readMsg),

plugins/destination/csv/client/client_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,16 @@ func TestClient(t *testing.T) {
4242
}
4343

4444
func TestPlugin(t *testing.T) {
45-
ctx := context.Background()
4645
p := plugins.NewDestinationPlugin("csv", "development", New)
4746

48-
if err := plugins.DestinationPluginTestHelper(ctx, p, getTestLogger(t), specs.Destination{
49-
WriteMode: specs.WriteModeAppend,
50-
Spec: Spec{
47+
plugins.DestinationPluginTestSuiteRunner(t, p,
48+
Spec{
5149
Directory: t.TempDir(),
5250
},
53-
}); err != nil {
54-
t.Fatal(err)
55-
}
51+
plugins.DestinationTestSuiteTests{
52+
Overwrite: true,
53+
Append: true,
54+
DeleteStale: true,
55+
},
56+
)
5657
}

plugins/destination/csv/client/read.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ import (
99
"os"
1010
"path"
1111

12-
"github.com/cloudquery/plugin-sdk/plugins"
1312
"github.com/cloudquery/plugin-sdk/schema"
1413
)
1514

16-
func (c *Client) read(table *schema.Table, sourceName string, res chan<- *schema.DestinationResource) error {
15+
func (c *Client) read(table *schema.Table, sourceName string, res chan<- []interface{}) error {
1716
filePath := path.Join(c.csvSpec.Directory, table.Name+".csv")
1817
f, err := os.Open(filePath)
1918
if err != nil {
@@ -33,7 +32,6 @@ func (c *Client) read(table *schema.Table, sourceName string, res chan<- *schema
3332
if sourceNameIndex == -1 {
3433
return fmt.Errorf("could not find column %s in table %s", schema.CqSourceNameColumn.Name, table.Name)
3534
}
36-
transformer := plugins.DefaultReverseTransformer{}
3735

3836
for {
3937
record, err := r.Read()
@@ -51,24 +49,17 @@ func (c *Client) read(table *schema.Table, sourceName string, res chan<- *schema
5149
values[i] = v
5250
}
5351

54-
cqTypes, err := transformer.ReverseTransformValues(table, values)
55-
if err != nil {
56-
return err
57-
}
58-
res <- &schema.DestinationResource{
59-
TableName: table.Name,
60-
Data: cqTypes,
61-
}
52+
res <- values
6253
}
6354
return nil
6455
}
6556

66-
func (c *Client) Read(tx context.Context, table *schema.Table, sourceName string, res chan<- *schema.DestinationResource) error {
57+
func (c *Client) Read(tx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error {
6758
msg := &readMsg{
6859
table: table,
6960
source: sourceName,
7061
err: make(chan error),
71-
resources: make(chan *schema.DestinationResource),
62+
resources: make(chan []interface{}),
7263
}
7364
c.readChan <- msg
7465
for {
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package client
2+
3+
import "github.com/cloudquery/plugin-sdk/schema"
4+
5+
func (*Client) TransformBool(v *schema.Bool) interface{} {
6+
return v.String()
7+
}
8+
9+
func (*Client) TransformBytea(v *schema.Bytea) interface{} {
10+
return v.String()
11+
}
12+
13+
func (*Client) TransformFloat8(v *schema.Float8) interface{} {
14+
return v.String()
15+
}
16+
17+
func (*Client) TransformInt8(v *schema.Int8) interface{} {
18+
return v.String()
19+
}
20+
21+
func (*Client) TransformInt8Array(v *schema.Int8Array) interface{} {
22+
return v.String()
23+
}
24+
25+
func (*Client) TransformJSON(v *schema.JSON) interface{} {
26+
return v.String()
27+
}
28+
29+
func (*Client) TransformText(v *schema.Text) interface{} {
30+
return v.String()
31+
}
32+
33+
func (*Client) TransformTextArray(v *schema.TextArray) interface{} {
34+
return v.String()
35+
}
36+
37+
func (*Client) TransformTimestamptz(v *schema.Timestamptz) interface{} {
38+
return v.String()
39+
}
40+
41+
func (*Client) TransformUUID(v *schema.UUID) interface{} {
42+
return v.String()
43+
}
44+
45+
func (*Client) TransformUUIDArray(v *schema.UUIDArray) interface{} {
46+
return v.String()
47+
}
48+
49+
func (*Client) TransformCIDR(v *schema.CIDR) interface{} {
50+
return v.String()
51+
}
52+
53+
func (*Client) TransformCIDRArray(v *schema.CIDRArray) interface{} {
54+
return v.String()
55+
}
56+
57+
func (*Client) TransformInet(v *schema.Inet) interface{} {
58+
return v.String()
59+
}
60+
61+
func (*Client) TransformInetArray(v *schema.InetArray) interface{} {
62+
return v.String()
63+
}
64+
65+
func (*Client) TransformMacaddr(v *schema.Macaddr) interface{} {
66+
return v.String()
67+
}
68+
69+
func (*Client) TransformMacaddrArray(v *schema.MacaddrArray) interface{} {
70+
return v.String()
71+
}

plugins/destination/csv/client/write.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"os"
77
"path"
88

9+
"github.com/cloudquery/plugin-sdk/plugins"
910
"github.com/cloudquery/plugin-sdk/schema"
1011
)
1112

@@ -78,10 +79,10 @@ func (c *Client) endWrite(tables schema.Tables) {
7879

7980
// this should only be called from the main listen goroutine to ensure writing to files
8081
// only happens in one place
81-
func (c *Client) writeResource(resource *schema.DestinationResource) {
82+
func (c *Client) writeResource(resource *plugins.ClientResource) {
8283
record := make([]string, len(resource.Data))
8384
for i, v := range resource.Data {
84-
record[i] = v.String()
85+
record[i] = v.(string)
8586
}
8687

8788
if err := c.writers[resource.TableName].writer.Write(record); err != nil {
@@ -90,7 +91,7 @@ func (c *Client) writeResource(resource *schema.DestinationResource) {
9091
}
9192
}
9293

93-
func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *schema.DestinationResource) error {
94+
func (c *Client) Write(ctx context.Context, tables schema.Tables, res <-chan *plugins.ClientResource) error {
9495
startWriteMsg := &startWriteMsg{
9596
tables: tables,
9697
err: make(chan error),

plugins/destination/csv/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/cloudquery/cloudquery/plugins/destination/csv
33
go 1.19
44

55
require (
6-
github.com/cloudquery/plugin-sdk v1.0.4
6+
github.com/cloudquery/plugin-sdk v1.1.0
77
github.com/rs/zerolog v1.28.0
88
)
99

plugins/destination/csv/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ github.com/avast/retry-go/v4 v4.3.0 h1:cqI48aXx0BExKoM7XPklDpoHAg7/srPPLAfWG5z62
44
github.com/bradleyjkemp/cupaloy/v2 v2.8.0 h1:any4BmKE+jGIaMpnU8YgH/I2LPiLBufr6oMMlVBbn9M=
55
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
66
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
7-
github.com/cloudquery/plugin-sdk v1.0.4 h1:5Vhp4AF538lxxXiZzm73XELDp00Wdc6cvE0AIZZOqss=
8-
github.com/cloudquery/plugin-sdk v1.0.4/go.mod h1:VbqV2BE0wbYbArSUNxAuh6jH1FlrHyJp0f/5ERuBcew=
7+
github.com/cloudquery/plugin-sdk v1.1.0 h1:4ABpcMN7Z+hM8/z+TFaN8lrbG0Ui7z4RV62h25QyEos=
8+
github.com/cloudquery/plugin-sdk v1.1.0/go.mod h1:VbqV2BE0wbYbArSUNxAuh6jH1FlrHyJp0f/5ERuBcew=
99
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
1010
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
1111
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=

plugins/destination/postgresql/client/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
)
1515

1616
type Client struct {
17+
plugins.DefaultReverseTransformer
1718
conn *pgxpool.Pool
1819
logger zerolog.Logger
1920
spec specs.Destination
Lines changed: 7 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
11
package client
22

33
import (
4-
"context"
54
"os"
65
"testing"
7-
"time"
86

97
"github.com/cloudquery/plugin-sdk/plugins"
10-
"github.com/cloudquery/plugin-sdk/specs"
11-
"github.com/rs/zerolog"
128
)
139

14-
func getTestLogger(t *testing.T) zerolog.Logger {
15-
t.Helper()
16-
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
17-
return zerolog.New(zerolog.NewTestWriter(t)).Output(
18-
zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMicro},
19-
).Level(zerolog.DebugLevel).With().Timestamp().Logger()
20-
}
21-
2210
func getTestConnection() string {
2311
testConn := os.Getenv("CQ_DEST_PG_TEST_CONN")
2412
if testConn == "" {
@@ -27,43 +15,16 @@ func getTestConnection() string {
2715
return testConn
2816
}
2917

30-
func TestInitialize(t *testing.T) {
31-
ctx := context.Background()
32-
client, err := New(ctx, getTestLogger(t), specs.Destination{
33-
Spec: Spec{
34-
ConnectionString: getTestConnection(),
35-
},
36-
})
37-
if err != nil {
38-
t.Fatal(err)
39-
}
40-
if client == nil {
41-
t.Fatal("client is nil")
42-
}
43-
if err := client.Close(ctx); err != nil {
44-
t.Fatal(err)
45-
}
46-
err = client.Close(ctx)
47-
if err == nil {
48-
t.Fatal("expected error when closing a closed client second time")
49-
}
50-
51-
if err.Error() != "client already closed or not initialized" {
52-
t.Fatal("expected error when closing a closed client second time")
53-
}
54-
}
55-
5618
func TestPgPlugin(t *testing.T) {
57-
ctx := context.Background()
5819
p := plugins.NewDestinationPlugin("postgresql", "development", New)
59-
60-
if err := plugins.DestinationPluginTestHelper(ctx, p, getTestLogger(t), specs.Destination{
61-
WriteMode: specs.WriteModeAppend,
62-
Spec: Spec{
20+
plugins.DestinationPluginTestSuiteRunner(t, p,
21+
Spec{
6322
ConnectionString: getTestConnection(),
6423
PgxLogLevel: LogLevelTrace,
6524
},
66-
}); err != nil {
67-
t.Fatal(err)
68-
}
25+
plugins.DestinationTestSuiteTests{
26+
Overwrite: true,
27+
Append: true,
28+
DeleteStale: true,
29+
})
6930
}

plugins/destination/postgresql/client/read.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66

7-
"github.com/cloudquery/plugin-sdk/plugins"
87
"github.com/cloudquery/plugin-sdk/schema"
98
"github.com/jackc/pgx/v4"
109
)
@@ -13,26 +12,18 @@ const (
1312
readSQL = "SELECT * FROM %s WHERE _cq_source_name = $1"
1413
)
1514

16-
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- *schema.DestinationResource) error {
15+
func (c *Client) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error {
1716
sql := fmt.Sprintf(readSQL, pgx.Identifier{table.Name}.Sanitize())
1817
rows, err := c.conn.Query(ctx, sql, sourceName)
1918
if err != nil {
2019
return err
2120
}
22-
transformer := plugins.DefaultReverseTransformer{}
2321
for rows.Next() {
2422
values, err := rows.Values()
2523
if err != nil {
2624
return err
2725
}
28-
cqTypes, err := transformer.ReverseTransformValues(table, values)
29-
if err != nil {
30-
return err
31-
}
32-
res <- &schema.DestinationResource{
33-
TableName: table.Name,
34-
Data: cqTypes,
35-
}
26+
res <- values
3627
}
3728
rows.Close()
3829
return nil

0 commit comments

Comments
 (0)