Skip to content

Commit 3df6129

Browse files
authored
feat: Add managed API for destination plugins (#521)
This adds a managed API for destination plugins. Closes #518 as it got destroyed with conflicts. This should go with that: cloudquery/cloudquery#5805
1 parent 1fe91cb commit 3df6129

21 files changed

Lines changed: 967 additions & 394 deletions

internal/memdb/memdb.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package memdb
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"testing"
8+
"time"
9+
10+
"github.com/cloudquery/plugin-sdk/plugins/destination"
11+
"github.com/cloudquery/plugin-sdk/schema"
12+
"github.com/cloudquery/plugin-sdk/specs"
13+
"github.com/rs/zerolog"
14+
)
15+
16+
// client is mostly used for testing the destination plugin.
17+
type client struct {
18+
schema.DefaultTransformer
19+
spec specs.Destination
20+
memoryDB map[string][][]any
21+
errOnWrite bool
22+
blockingWrite bool
23+
}
24+
25+
type Option func(*client)
26+
27+
func WithErrOnWrite() Option {
28+
return func(c *client) {
29+
c.errOnWrite = true
30+
}
31+
}
32+
33+
func WithBlockingWrite() Option {
34+
return func(c *client) {
35+
c.blockingWrite = true
36+
}
37+
}
38+
39+
func GetNewClient(options ...Option) destination.NewClientFunc {
40+
c := &client{
41+
memoryDB: make(map[string][][]any),
42+
}
43+
for _, opt := range options {
44+
opt(c)
45+
}
46+
return func(context.Context, zerolog.Logger, specs.Destination) (destination.Client, error) {
47+
return c, nil
48+
}
49+
}
50+
51+
func getTestLogger(t *testing.T) zerolog.Logger {
52+
t.Helper()
53+
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMs
54+
return zerolog.New(zerolog.NewTestWriter(t)).Output(
55+
zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMicro},
56+
).Level(zerolog.DebugLevel).With().Timestamp().Logger()
57+
}
58+
59+
func NewClient(_ context.Context, _ zerolog.Logger, spec specs.Destination) (destination.Client, error) {
60+
return &client{
61+
memoryDB: make(map[string][][]any),
62+
spec: spec,
63+
}, nil
64+
}
65+
66+
func NewClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (destination.Client, error) {
67+
return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew")
68+
}
69+
70+
func (*client) ReverseTransformValues(_ *schema.Table, values []any) (schema.CQTypes, error) {
71+
res := make(schema.CQTypes, len(values))
72+
for i, v := range values {
73+
res[i] = v.(schema.CQType)
74+
}
75+
return res, nil
76+
}
77+
func (c *client) overwrite(table *schema.Table, data []any) {
78+
pks := table.PrimaryKeys()
79+
pksIndex := make([]int, len(pks))
80+
for i := range pks {
81+
pksIndex[i] = table.Columns.Index(pks[i])
82+
}
83+
for i, row := range c.memoryDB[table.Name] {
84+
found := true
85+
for _, pkIndex := range pksIndex {
86+
if !row[pkIndex].(schema.CQType).Equal(data[pkIndex].(schema.CQType)) {
87+
found = false
88+
}
89+
}
90+
if found {
91+
c.memoryDB[table.Name] = append(c.memoryDB[table.Name][:i], c.memoryDB[table.Name][i+1:]...)
92+
c.memoryDB[table.Name] = append(c.memoryDB[table.Name], data)
93+
return
94+
}
95+
}
96+
c.memoryDB[table.Name] = append(c.memoryDB[table.Name], data)
97+
}
98+
99+
func (c *client) Migrate(_ context.Context, tables schema.Tables) error {
100+
for _, table := range tables {
101+
if c.memoryDB[table.Name] == nil {
102+
c.memoryDB[table.Name] = make([][]any, 0)
103+
}
104+
}
105+
return nil
106+
}
107+
108+
func (c *client) Read(_ context.Context, table *schema.Table, source string, res chan<- []any) error {
109+
if c.memoryDB[table.Name] == nil {
110+
return nil
111+
}
112+
sourceColIndex := table.Columns.Index(schema.CqSourceNameColumn.Name)
113+
var sortedRes [][]any
114+
for _, row := range c.memoryDB[table.Name] {
115+
if row[sourceColIndex].(*schema.Text).Str == source {
116+
sortedRes = append(sortedRes, row)
117+
}
118+
}
119+
120+
for _, row := range sortedRes {
121+
res <- row
122+
}
123+
return nil
124+
}
125+
126+
func (c *client) Write(ctx context.Context, tables schema.Tables, resources <-chan *destination.ClientResource) error {
127+
if c.errOnWrite {
128+
return fmt.Errorf("errOnWrite")
129+
}
130+
if c.blockingWrite {
131+
<-ctx.Done()
132+
if c.errOnWrite {
133+
return fmt.Errorf("errOnWrite")
134+
}
135+
return nil
136+
}
137+
for resource := range resources {
138+
if c.spec.WriteMode == specs.WriteModeAppend {
139+
c.memoryDB[resource.TableName] = append(c.memoryDB[resource.TableName], resource.Data)
140+
} else {
141+
c.overwrite(tables.Get(resource.TableName), resource.Data)
142+
}
143+
}
144+
return nil
145+
}
146+
147+
func (c *client) WriteTableBatch(ctx context.Context, table *schema.Table, resources [][]any) error {
148+
if c.errOnWrite {
149+
return fmt.Errorf("errOnWrite")
150+
}
151+
if c.blockingWrite {
152+
<-ctx.Done()
153+
if c.errOnWrite {
154+
return fmt.Errorf("errOnWrite")
155+
}
156+
return nil
157+
}
158+
for _, resource := range resources {
159+
if c.spec.WriteMode == specs.WriteModeAppend {
160+
c.memoryDB[table.Name] = append(c.memoryDB[table.Name], resource)
161+
} else {
162+
c.overwrite(table, resource)
163+
}
164+
}
165+
return nil
166+
}
167+
168+
func (*client) Metrics() destination.Metrics {
169+
return destination.Metrics{}
170+
}
171+
172+
func (c *client) Close(context.Context) error {
173+
c.memoryDB = nil
174+
return nil
175+
}
176+
177+
func (c *client) DeleteStale(ctx context.Context, tables schema.Tables, source string, syncTime time.Time) error {
178+
for _, table := range tables {
179+
c.deleteStaleTable(ctx, table, source, syncTime)
180+
if err := c.DeleteStale(ctx, table.Relations, source, syncTime); err != nil {
181+
return err
182+
}
183+
}
184+
return nil
185+
}
186+
187+
func (c *client) deleteStaleTable(_ context.Context, table *schema.Table, source string, syncTime time.Time) {
188+
sourceColIndex := table.Columns.Index(schema.CqSourceNameColumn.Name)
189+
syncColIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name)
190+
var filteredTable [][]any
191+
for i, row := range c.memoryDB[table.Name] {
192+
if row[sourceColIndex].(*schema.Text).Str == source {
193+
rowSyncTime := row[syncColIndex].(*schema.Timestamptz)
194+
if !rowSyncTime.Time.UTC().Before(syncTime) {
195+
filteredTable = append(filteredTable, c.memoryDB[table.Name][i])
196+
}
197+
}
198+
}
199+
c.memoryDB[table.Name] = filteredTable
200+
}
Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,31 @@
1-
package destination
1+
package memdb
22

33
import (
44
"context"
55
"testing"
66
"time"
77

88
"github.com/cloudquery/plugin-sdk/internal/testdata"
9+
"github.com/cloudquery/plugin-sdk/plugins/destination"
910
"github.com/cloudquery/plugin-sdk/schema"
1011
"github.com/cloudquery/plugin-sdk/specs"
1112
)
1213

13-
func TestPlugin(t *testing.T) {
14-
p := NewPlugin("test", "development", NewTestDestinationMemDBClient)
15-
PluginTestSuiteRunner(t, p, nil,
16-
TestSuiteTests{})
14+
func TestPluginUnmanagedClient(t *testing.T) {
15+
p := destination.NewPlugin("test", "development", NewClient)
16+
destination.PluginTestSuiteRunner(t, p, nil,
17+
destination.PluginTestSuiteTests{})
1718
}
1819

19-
func TestOnNewError(t *testing.T) {
20+
func TestPluginManagedClient(t *testing.T) {
21+
p := destination.NewPlugin("test", "development", NewClient, destination.WithManagerWriter())
22+
destination.PluginTestSuiteRunner(t, p, nil,
23+
destination.PluginTestSuiteTests{})
24+
}
25+
26+
func TestPluginOnNewError(t *testing.T) {
2027
ctx := context.Background()
21-
p := NewPlugin("test", "development", newTestDestinationMemDBClientErrOnNew)
28+
p := destination.NewPlugin("test", "development", NewClientErrOnNew)
2229
err := p.Init(ctx, getTestLogger(t), specs.Destination{})
2330

2431
if err == nil {
@@ -28,20 +35,21 @@ func TestOnNewError(t *testing.T) {
2835

2936
func TestOnWriteError(t *testing.T) {
3037
ctx := context.Background()
31-
newClientFunc := getNewTestDestinationMemDBClient(withErrOnWrite())
32-
p := NewPlugin("test", "development", newClientFunc)
38+
newClientFunc := GetNewClient(WithErrOnWrite())
39+
p := destination.NewPlugin("test", "development", newClientFunc)
3340
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
3441
t.Fatal(err)
3542
}
43+
table := testdata.TestTable("test")
3644
tables := []*schema.Table{
37-
testdata.TestTable("test"),
45+
table,
3846
}
3947
sourceName := "TestDestinationOnWriteError"
4048
syncTime := time.Now()
4149
ch := make(chan schema.DestinationResource, 1)
4250
ch <- schema.DestinationResource{
4351
TableName: "test",
44-
Data: testdata.TestData(),
52+
Data: testdata.GenTestData(table),
4553
}
4654
close(ch)
4755
err := p.Write(ctx, tables, sourceName, syncTime, ch)
@@ -55,11 +63,12 @@ func TestOnWriteError(t *testing.T) {
5563

5664
func TestOnWriteCtxCancelled(t *testing.T) {
5765
ctx := context.Background()
58-
newClientFunc := getNewTestDestinationMemDBClient(withBlockingWrite())
59-
p := NewPlugin("test", "development", newClientFunc)
66+
newClientFunc := GetNewClient(WithBlockingWrite())
67+
p := destination.NewPlugin("test", "development", newClientFunc)
6068
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
6169
t.Fatal(err)
6270
}
71+
table := testdata.TestTable("test")
6372
tables := []*schema.Table{
6473
testdata.TestTable("test"),
6574
}
@@ -69,7 +78,7 @@ func TestOnWriteCtxCancelled(t *testing.T) {
6978
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
7079
ch <- schema.DestinationResource{
7180
TableName: "test",
72-
Data: testdata.TestData(),
81+
Data: testdata.GenTestData(table),
7382
}
7483
defer cancel()
7584
err := p.Write(ctx, tables, sourceName, syncTime, ch)

0 commit comments

Comments
 (0)