Skip to content

Commit 6733785

Browse files
authored
fix: Move source & destination plugin code to separate packages (#516)
Im trying to add more functionality and tests to our destination plugin API but this because quite hard now when source and plugin reside in the same place and everything is suffixed with `source` or `destination`. This should make it more "go" idiomatic by splitting `source` and `destination` into two different packages and help with follow-up PRs. This is a breaking change but relatively minor one as it is mostly affecting one file in each plugin (the `main.go` or `plugin.go`)
1 parent a0b8a46 commit 6733785

37 files changed

Lines changed: 365 additions & 173 deletions

clients/destination.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"time"
1616

1717
"github.com/cloudquery/plugin-sdk/internal/pb"
18-
"github.com/cloudquery/plugin-sdk/plugins"
18+
"github.com/cloudquery/plugin-sdk/plugins/destination"
1919
"github.com/cloudquery/plugin-sdk/schema"
2020
"github.com/cloudquery/plugin-sdk/specs"
2121
"github.com/rs/zerolog"
@@ -196,12 +196,12 @@ func (c *DestinationClient) GetProtocolVersion(ctx context.Context) (uint64, err
196196
return res.Version, nil
197197
}
198198

199-
func (c *DestinationClient) GetMetrics(ctx context.Context) (*plugins.DestinationMetrics, error) {
199+
func (c *DestinationClient) GetMetrics(ctx context.Context) (*destination.Metrics, error) {
200200
res, err := c.pbClient.GetMetrics(ctx, &pb.GetDestinationMetrics_Request{})
201201
if err != nil {
202202
return nil, fmt.Errorf("failed to call GetMetrics: %w", err)
203203
}
204-
var stats plugins.DestinationMetrics
204+
var stats destination.Metrics
205205
if err := json.Unmarshal(res.Metrics, &stats); err != nil {
206206
return nil, fmt.Errorf("failed to unmarshal destination metrics: %w", err)
207207
}

clients/source.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"sync"
1515

1616
"github.com/cloudquery/plugin-sdk/internal/pb"
17-
"github.com/cloudquery/plugin-sdk/plugins"
17+
"github.com/cloudquery/plugin-sdk/plugins/source"
1818
"github.com/cloudquery/plugin-sdk/schema"
1919
"github.com/cloudquery/plugin-sdk/specs"
2020
"github.com/rs/zerolog"
@@ -217,12 +217,12 @@ func (c *SourceClient) Version(ctx context.Context) (string, error) {
217217
return res.Version, nil
218218
}
219219

220-
func (c *SourceClient) GetMetrics(ctx context.Context) (*plugins.SourceMetrics, error) {
220+
func (c *SourceClient) GetMetrics(ctx context.Context) (*source.Metrics, error) {
221221
res, err := c.pbClient.GetMetrics(ctx, &pb.GetSourceMetrics_Request{})
222222
if err != nil {
223223
return nil, fmt.Errorf("failed to call GetMetrics: %w", err)
224224
}
225-
var stats plugins.SourceMetrics
225+
var stats source.Metrics
226226
if err := json.Unmarshal(res.Metrics, &stats); err != nil {
227227
return nil, fmt.Errorf("failed to unmarshal source stats: %w", err)
228228
}

internal/servers/destinations.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"io"
88

99
"github.com/cloudquery/plugin-sdk/internal/pb"
10-
"github.com/cloudquery/plugin-sdk/plugins"
10+
"github.com/cloudquery/plugin-sdk/plugins/destination"
1111
"github.com/cloudquery/plugin-sdk/schema"
1212
"github.com/cloudquery/plugin-sdk/specs"
1313
"github.com/rs/zerolog"
@@ -18,7 +18,7 @@ import (
1818

1919
type DestinationServer struct {
2020
pb.UnimplementedDestinationServer
21-
Plugin *plugins.DestinationPlugin
21+
Plugin *destination.Plugin
2222
Logger zerolog.Logger
2323
}
2424

internal/servers/source.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"fmt"
99

1010
"github.com/cloudquery/plugin-sdk/internal/pb"
11-
"github.com/cloudquery/plugin-sdk/plugins"
11+
"github.com/cloudquery/plugin-sdk/plugins/source"
1212
"github.com/cloudquery/plugin-sdk/schema"
1313
"github.com/cloudquery/plugin-sdk/specs"
1414
"github.com/getsentry/sentry-go"
@@ -20,7 +20,7 @@ import (
2020

2121
type SourceServer struct {
2222
pb.UnimplementedSourceServer
23-
Plugin *plugins.SourcePlugin
23+
Plugin *source.Plugin
2424
Logger zerolog.Logger
2525
}
2626

@@ -139,16 +139,16 @@ func (s *SourceServer) GetMetrics(context.Context, *pb.GetSourceMetrics_Request)
139139
// Aggregate metrics before sending to keep response size small.
140140
// Temporary fix for https://github.com/cloudquery/cloudquery/issues/3962
141141
m := s.Plugin.Metrics()
142-
agg := &plugins.TableClientMetrics{}
142+
agg := &source.TableClientMetrics{}
143143
for _, table := range m.TableClient {
144144
for _, tableClient := range table {
145145
agg.Resources += tableClient.Resources
146146
agg.Errors += tableClient.Errors
147147
agg.Panics += tableClient.Panics
148148
}
149149
}
150-
b, err := json.Marshal(&plugins.SourceMetrics{
151-
TableClient: map[string]map[string]*plugins.TableClientMetrics{"": {"": agg}},
150+
b, err := json.Marshal(&source.Metrics{
151+
TableClient: map[string]map[string]*source.TableClientMetrics{"": {"": agg}},
152152
})
153153
if err != nil {
154154
return nil, fmt.Errorf("failed to marshal source metrics: %w", err)
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package plugins
1+
package destination
22

33
import (
44
"context"
@@ -33,25 +33,25 @@ func withBlockingWrite() TestDestinationOption {
3333
}
3434
}
3535

36-
func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewDestinationClientFunc {
36+
func getNewTestDestinationMemDBClient(options ...TestDestinationOption) NewClientFunc {
3737
c := &TestDestinationMemDBClient{
3838
memoryDB: make(map[string][][]interface{}),
3939
}
4040
for _, opt := range options {
4141
opt(c)
4242
}
43-
return func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
43+
return func(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
4444
return c, nil
4545
}
4646
}
4747

48-
func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
48+
func NewTestDestinationMemDBClient(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
4949
return &TestDestinationMemDBClient{
5050
memoryDB: make(map[string][][]interface{}),
5151
}, nil
5252
}
5353

54-
func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error) {
54+
func newTestDestinationMemDBClientErrOnNew(context.Context, zerolog.Logger, specs.Destination) (Client, error) {
5555
return nil, fmt.Errorf("newTestDestinationMemDBClientErrOnNew")
5656
}
5757

@@ -132,8 +132,8 @@ func (c *TestDestinationMemDBClient) Write(ctx context.Context, tables schema.Ta
132132
return nil
133133
}
134134

135-
func (*TestDestinationMemDBClient) Metrics() DestinationMetrics {
136-
return DestinationMetrics{}
135+
func (*TestDestinationMemDBClient) Metrics() Metrics {
136+
return Metrics{}
137137
}
138138

139139
func (c *TestDestinationMemDBClient) Close(context.Context) error {
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package plugins
1+
package destination
22

3-
type DestinationMetrics struct {
3+
type Metrics struct {
44
// Errors number of errors / failed writes
55
Errors uint64
66
// Writes number of successful writes
Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package plugins
1+
package destination
22

33
import (
44
"context"
@@ -11,15 +11,15 @@ import (
1111
"golang.org/x/sync/errgroup"
1212
)
1313

14-
type NewDestinationClientFunc func(context.Context, zerolog.Logger, specs.Destination) (DestinationClient, error)
14+
type NewClientFunc func(context.Context, zerolog.Logger, specs.Destination) (Client, error)
1515

16-
type DestinationClient interface {
16+
type Client interface {
1717
schema.CQTypeTransformer
1818
ReverseTransformValues(table *schema.Table, values []interface{}) (schema.CQTypes, error)
1919
Migrate(ctx context.Context, tables schema.Tables) error
2020
Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- []interface{}) error
2121
Write(ctx context.Context, tables schema.Tables, res <-chan *ClientResource) error
22-
Metrics() DestinationMetrics
22+
Metrics() Metrics
2323
DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error
2424
Close(ctx context.Context) error
2525
}
@@ -29,15 +29,15 @@ type ClientResource struct {
2929
Data []interface{}
3030
}
3131

32-
type DestinationPlugin struct {
32+
type Plugin struct {
3333
// Name of destination plugin i.e postgresql,snowflake
3434
name string
3535
// Version of the destination plugin
3636
version string
3737
// Called upon configure call to validate and init configuration
38-
newDestinationClient NewDestinationClientFunc
38+
newDestinationClient NewClientFunc
3939
// initialized destination client
40-
client DestinationClient
40+
client Client
4141
// spec the client was initialized with
4242
spec specs.Destination
4343
// Logger to call, this logger is passed to the serve.Serve Client, if not define Serve will create one instead.
@@ -46,29 +46,29 @@ type DestinationPlugin struct {
4646

4747
const writeWorkers = 1
4848

49-
func NewDestinationPlugin(name string, version string, newDestinationClient NewDestinationClientFunc) *DestinationPlugin {
50-
p := &DestinationPlugin{
49+
func NewDestinationPlugin(name string, version string, newDestinationClient NewClientFunc) *Plugin {
50+
p := &Plugin{
5151
name: name,
5252
version: version,
5353
newDestinationClient: newDestinationClient,
5454
}
5555
return p
5656
}
5757

58-
func (p *DestinationPlugin) Name() string {
58+
func (p *Plugin) Name() string {
5959
return p.name
6060
}
6161

62-
func (p *DestinationPlugin) Version() string {
62+
func (p *Plugin) Version() string {
6363
return p.version
6464
}
6565

66-
func (p *DestinationPlugin) Metrics() DestinationMetrics {
66+
func (p *Plugin) Metrics() Metrics {
6767
return p.client.Metrics()
6868
}
6969

7070
// we need lazy loading because we want to be able to initialize after
71-
func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error {
71+
func (p *Plugin) Init(ctx context.Context, logger zerolog.Logger, spec specs.Destination) error {
7272
var err error
7373
p.logger = logger
7474
p.spec = spec
@@ -83,12 +83,12 @@ func (p *DestinationPlugin) Init(ctx context.Context, logger zerolog.Logger, spe
8383
}
8484

8585
// we implement all DestinationClient functions so we can hook into pre-post behavior
86-
func (p *DestinationPlugin) Migrate(ctx context.Context, tables schema.Tables) error {
86+
func (p *Plugin) Migrate(ctx context.Context, tables schema.Tables) error {
8787
SetDestinationManagedCqColumns(tables)
8888
return p.client.Migrate(ctx, tables)
8989
}
9090

91-
func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
91+
func (p *Plugin) readAll(ctx context.Context, table *schema.Table, sourceName string) ([]schema.CQTypes, error) {
9292
var readErr error
9393
ch := make(chan schema.CQTypes)
9494
go func() {
@@ -103,7 +103,7 @@ func (p *DestinationPlugin) readAll(ctx context.Context, table *schema.Table, so
103103
return resources, readErr
104104
}
105105

106-
func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error {
106+
func (p *Plugin) Read(ctx context.Context, table *schema.Table, sourceName string, res chan<- schema.CQTypes) error {
107107
SetDestinationManagedCqColumns(schema.Tables{table})
108108
ch := make(chan []interface{})
109109
var err error
@@ -122,13 +122,13 @@ func (p *DestinationPlugin) Read(ctx context.Context, table *schema.Table, sourc
122122
}
123123

124124
// this function is currently used mostly for testing so it's not a public api
125-
func (p *DestinationPlugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error {
125+
func (p *Plugin) writeOne(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resource schema.DestinationResource) error {
126126
resources := []schema.DestinationResource{resource}
127127
return p.writeAll(ctx, tables, sourceName, syncTime, resources)
128128
}
129129

130130
// this function is currently used mostly for testing so it's not a public api
131-
func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error {
131+
func (p *Plugin) writeAll(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, resources []schema.DestinationResource) error {
132132
ch := make(chan schema.DestinationResource, len(resources))
133133
for _, resource := range resources {
134134
ch <- resource
@@ -137,7 +137,7 @@ func (p *DestinationPlugin) writeAll(ctx context.Context, tables schema.Tables,
137137
return p.Write(ctx, tables, sourceName, syncTime, ch)
138138
}
139139

140-
func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
140+
func (p *Plugin) Write(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
141141
syncTime = syncTime.UTC()
142142
SetDestinationManagedCqColumns(tables)
143143
ch := make(chan *ClientResource)
@@ -179,12 +179,12 @@ func (p *DestinationPlugin) Write(ctx context.Context, tables schema.Tables, sou
179179
return nil
180180
}
181181

182-
func (p *DestinationPlugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
182+
func (p *Plugin) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
183183
syncTime = syncTime.UTC()
184184
return p.client.DeleteStale(ctx, tables, sourceName, syncTime)
185185
}
186186

187-
func (p *DestinationPlugin) Close(ctx context.Context) error {
187+
func (p *Plugin) Close(ctx context.Context) error {
188188
return p.client.Close(ctx)
189189
}
190190

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package plugins
1+
package destination
22

33
import (
44
"context"
@@ -10,13 +10,13 @@ import (
1010
"github.com/cloudquery/plugin-sdk/specs"
1111
)
1212

13-
func TestDestinationPlugin(t *testing.T) {
13+
func TestPlugin(t *testing.T) {
1414
p := NewDestinationPlugin("test", "development", NewTestDestinationMemDBClient)
15-
DestinationPluginTestSuiteRunner(t, p, nil,
16-
DestinationTestSuiteTests{})
15+
PluginTestSuiteRunner(t, p, nil,
16+
TestSuiteTests{})
1717
}
1818

19-
func TestDestinationOnNewError(t *testing.T) {
19+
func TestOnNewError(t *testing.T) {
2020
ctx := context.Background()
2121
p := NewDestinationPlugin("test", "development", newTestDestinationMemDBClientErrOnNew)
2222
err := p.Init(ctx, getTestLogger(t), specs.Destination{})
@@ -26,7 +26,7 @@ func TestDestinationOnNewError(t *testing.T) {
2626
}
2727
}
2828

29-
func TestDestinationOnWriteError(t *testing.T) {
29+
func TestOnWriteError(t *testing.T) {
3030
ctx := context.Background()
3131
newClientFunc := getNewTestDestinationMemDBClient(withErrOnWrite())
3232
p := NewDestinationPlugin("test", "development", newClientFunc)
@@ -53,7 +53,7 @@ func TestDestinationOnWriteError(t *testing.T) {
5353
}
5454
}
5555

56-
func TestDestinationOnWriteCtxCancelled(t *testing.T) {
56+
func TestOnWriteCtxCancelled(t *testing.T) {
5757
ctx := context.Background()
5858
newClientFunc := getNewTestDestinationMemDBClient(withBlockingWrite())
5959
p := NewDestinationPlugin("test", "development", newClientFunc)

plugins/destination_reverse_transformer.go renamed to plugins/destination/reverse_transformer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package plugins
1+
package destination
22

33
import (
44
"fmt"

0 commit comments

Comments
 (0)