Skip to content

Commit 1a542b9

Browse files
authored
feat: Add Testing suite for destination plugins (#369)
Closes #348 Introduce testing suite for destination plugins as well as number of methods to including `transformers` and `reverseTransformers` to help testing suite be able to test "end-to-end". This also adds more testing with a `memoryDB` plugin to test destination plugin and destination plugin testing suite
1 parent 6aaa6a9 commit 1a542b9

File tree

13 files changed

+621
-265
lines changed

13 files changed

+621
-265
lines changed

internal/servers/destinations.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (*DestinationServer) Write(pb.Destination_WriteServer) error {
6363
// Note the order of operations in this method is important!
6464
// Trying to insert into the `resources` channel before starting the reader goroutine will cause a deadlock.
6565
func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error {
66-
resources := make(chan *schema.DestinationResource)
66+
resources := make(chan schema.DestinationResource)
6767

6868
r, err := msg.Recv()
6969
if err != nil {
@@ -100,7 +100,7 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error {
100100
}
101101
return fmt.Errorf("failed to receive msg: %w", err)
102102
}
103-
var resource *schema.DestinationResource
103+
var resource schema.DestinationResource
104104
if err := json.Unmarshal(r.Resource, &resource); err != nil {
105105
close(resources)
106106
if err := eg.Wait(); err != nil {

internal/testdata/testdata.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package testdata
2+
3+
import (
4+
"time"
5+
6+
"github.com/cloudquery/plugin-sdk/schema"
7+
"github.com/google/uuid"
8+
)
9+
10+
// TestTable returns a table with columns of all type. useful for destination testing purposes
11+
func TestTable(name string) *schema.Table {
12+
return &schema.Table{
13+
Name: name,
14+
Description: "Test table",
15+
Columns: schema.ColumnList{
16+
schema.CqIDColumn,
17+
schema.CqParentIDColumn,
18+
{
19+
Name: "bool",
20+
Type: schema.TypeBool,
21+
},
22+
{
23+
Name: "int",
24+
Type: schema.TypeInt,
25+
},
26+
{
27+
Name: "float",
28+
Type: schema.TypeFloat,
29+
},
30+
{
31+
Name: "uuid",
32+
Type: schema.TypeUUID,
33+
CreationOptions: schema.ColumnCreationOptions{PrimaryKey: true},
34+
},
35+
{
36+
Name: "text",
37+
Type: schema.TypeString,
38+
},
39+
{
40+
Name: "bytea",
41+
Type: schema.TypeByteArray,
42+
},
43+
{
44+
Name: "text_array",
45+
Type: schema.TypeStringArray,
46+
},
47+
{
48+
Name: "int_array",
49+
Type: schema.TypeIntArray,
50+
},
51+
{
52+
Name: "timestamp",
53+
Type: schema.TypeTimestamp,
54+
},
55+
{
56+
Name: "json",
57+
Type: schema.TypeJSON,
58+
},
59+
{
60+
Name: "uuid_array",
61+
Type: schema.TypeUUIDArray,
62+
},
63+
{
64+
Name: "inet",
65+
Type: schema.TypeInet,
66+
},
67+
{
68+
Name: "inet_array",
69+
Type: schema.TypeInetArray,
70+
},
71+
{
72+
Name: "cidr",
73+
Type: schema.TypeCIDR,
74+
},
75+
{
76+
Name: "cidr_array",
77+
Type: schema.TypeCIDRArray,
78+
},
79+
{
80+
Name: "macaddr",
81+
Type: schema.TypeMacAddr,
82+
},
83+
{
84+
Name: "macaddr_array",
85+
Type: schema.TypeMacAddrArray,
86+
},
87+
},
88+
}
89+
}
90+
91+
func TestData() schema.CQTypes {
92+
cqID := &schema.UUID{}
93+
if err := cqID.Set(uuid.New()); err != nil {
94+
panic(err)
95+
}
96+
cqParentID := &schema.UUID{}
97+
if err := cqParentID.Set("00000000-0000-0000-0000-000000000003"); err != nil {
98+
panic(err)
99+
}
100+
boolColumn := &schema.Bool{
101+
Bool: true,
102+
Status: schema.Present,
103+
}
104+
intColumn := &schema.Int8{
105+
Int: 1,
106+
Status: schema.Present,
107+
}
108+
floatColumn := &schema.Float8{
109+
Float: 1.1,
110+
Status: schema.Present,
111+
}
112+
uuidColumn := &schema.UUID{}
113+
if err := uuidColumn.Set("00000000-0000-0000-0000-000000000001"); err != nil {
114+
panic(err)
115+
}
116+
textColumn := &schema.Text{
117+
Str: "test",
118+
Status: schema.Present,
119+
}
120+
byteaColumn := &schema.Bytea{
121+
Bytes: []byte{1, 2, 3},
122+
Status: schema.Present,
123+
}
124+
textArrayColumn := &schema.TextArray{}
125+
if err := textArrayColumn.Set([]string{"test1", "test2"}); err != nil {
126+
panic(err)
127+
}
128+
intArrayColumn := &schema.Int8Array{}
129+
if err := intArrayColumn.Set([]int8{1, 2}); err != nil {
130+
panic(err)
131+
}
132+
timestampColumn := &schema.Timestamptz{
133+
Time: time.Now(),
134+
Status: schema.Present,
135+
}
136+
jsonColumn := &schema.JSON{
137+
Bytes: []byte(`{"test": "test"}`),
138+
Status: schema.Present,
139+
}
140+
uuidArrayColumn := &schema.UUIDArray{}
141+
if err := uuidArrayColumn.Set([]string{"00000000-0000-0000-0000-000000000001", "00000000-0000-0000-0000-000000000002"}); err != nil {
142+
panic(err)
143+
}
144+
inetColumn := &schema.Inet{}
145+
if err := inetColumn.Set("192.0.2.1/24"); err != nil {
146+
panic(err)
147+
}
148+
inetArrayColumn := &schema.InetArray{}
149+
if err := inetArrayColumn.Set([]string{"192.0.2.1/24", "192.0.2.1/24"}); err != nil {
150+
panic(err)
151+
}
152+
cidrColumn := &schema.CIDR{}
153+
if err := cidrColumn.Set("192.0.2.1"); err != nil {
154+
panic(err)
155+
}
156+
cidrArrayColumn := &schema.CIDRArray{}
157+
if err := cidrArrayColumn.Set([]string{"192.0.2.1", "192.0.2.1"}); err != nil {
158+
panic(err)
159+
}
160+
macaddrColumn := &schema.Macaddr{}
161+
if err := macaddrColumn.Set("aa:bb:cc:dd:ee:ff"); err != nil {
162+
panic(err)
163+
}
164+
macaddrArrayColumn := &schema.MacaddrArray{}
165+
if err := macaddrArrayColumn.Set([]string{"aa:bb:cc:dd:ee:ff", "11:22:33:44:55:66"}); err != nil {
166+
panic(err)
167+
}
168+
169+
data := schema.CQTypes{
170+
cqID,
171+
cqParentID,
172+
boolColumn,
173+
intColumn,
174+
floatColumn,
175+
uuidColumn,
176+
textColumn,
177+
byteaColumn,
178+
textArrayColumn,
179+
intArrayColumn,
180+
timestampColumn,
181+
jsonColumn,
182+
uuidArrayColumn,
183+
inetColumn,
184+
inetArrayColumn,
185+
cidrColumn,
186+
cidrArrayColumn,
187+
macaddrColumn,
188+
macaddrArrayColumn,
189+
}
190+
191+
return data
192+
}

plugins/destination.go

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,21 @@ import (
1313
type NewDestinationClientFunc func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error)
1414

1515
type DestinationClient interface {
16+
schema.CQTypeTransformer
17+
ReverseTransformValues(table *schema.Table, values []interface{}) (schema.CQTypes, error)
1618
Migrate(ctx context.Context, tables schema.Tables) error
17-
Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- *schema.DestinationResource) error
18-
Write(ctx context.Context, tables schema.Tables, res <-chan *schema.DestinationResource) error
19+
Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error
20+
Write(ctx context.Context, tables schema.Tables, res <-chan *ClientResource) error
1921
Metrics() DestinationMetrics
2022
DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
2123
Close(ctx context.Context) error
2224
}
2325

26+
type ClientResource struct {
27+
TableName string
28+
Data []interface{}
29+
}
30+
2431
type DestinationPlugin struct {
2532
// Name of destination plugin i.e postgresql,snowflake
2633
name string
@@ -77,15 +84,60 @@ func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) e
7784
return p.client.Migrate(ctx, tables)
7885
}
7986

80-
func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- *schema.DestinationResource) error {
87+
func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
88+
var readErr error
89+
ch := make(chan schema.CQTypes)
90+
go func() {
91+
defer close(ch)
92+
readErr = p.Read(ctx, table, sourceName, ch)
93+
}()
94+
//nolint:prealloc
95+
var resources []schema.CQTypes
96+
for resource := range ch {
97+
resources = append(resources, resource)
98+
}
99+
return resources, readErr
100+
}
101+
102+
func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error {
81103
SetDestinationManagedCqColumns(schema.Tables{table})
82-
return p.client.Read(ctx, table, sourceName, res)
104+
ch := make(chan []interface{})
105+
var err error
106+
go func() {
107+
defer close(ch)
108+
err = p.client.Read(ctx, table, sourceName, ch)
109+
}()
110+
for resource := range ch {
111+
r, err := p.client.ReverseTransformValues(table, resource)
112+
if err != nil {
113+
return err
114+
}
115+
res <- r
116+
}
117+
return err
83118
}
84119

85-
func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan *schema.DestinationResource) error {
120+
// this function is currently used mostly for testing so it's not a public api
121+
func (p *DestinationPlugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error {
122+
resources := []schema.DestinationResource{resource}
123+
return p.writeAll(ctx, tables, sourceName, syncTime, resources)
124+
}
125+
126+
// this function is currently used mostly for testing so it's not a public api
127+
func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error {
128+
ch := make(chan schema.DestinationResource, len(resources))
129+
for _, resource := range resources {
130+
ch <- resource
131+
}
132+
close(ch)
133+
return p.Write(ctx, tables, sourceName, syncTime, ch)
134+
}
135+
136+
func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
137+
syncTime = syncTime.UTC()
86138
SetDestinationManagedCqColumns(tables)
87-
ch := make(chan *schema.DestinationResource)
88-
eg := &errgroup.Group{}
139+
ch := make(chan *ClientResource)
140+
eg, ctx := errgroup.WithContext(ctx)
89141
// given most destination plugins writing in batch we are using a worker pool to write in parallel
90142
// it might not generalize well and we might need to move it to each destination plugin implementation.
91143
for i := 0; i < writeWorkers; i++ {
@@ -99,7 +151,16 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou
99151
_ = syncTimeColumn.Set(syncTime)
100152
for r := range res {
101153
r.Data = append([]schema.CQType{sourceColumn, syncTimeColumn}, r.Data...)
102-
ch <- r
154+
clientResource := &ClientResource{
155+
TableName: r.TableName,
156+
Data: schema.TransformWithTransformer(p.client, r.Data),
157+
}
158+
select {
159+
case <-ctx.Done():
160+
close(ch)
161+
return eg.Wait()
162+
case ch <- clientResource:
163+
}
103164
}
104165

105166
close(ch)
@@ -115,6 +176,7 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou
115176
}
116177

117178
func (p *DestinationPlugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
179+
syncTime = syncTime.UTC()
118180
return p.client.DeleteStale(ctx, tables, sourceName, syncTime)
119181
}
120182

0 commit comments

Comments
 (0)