Skip to content

Commit 26237c3

Browse files
authored
fix!: Make destinations work over gRPC only (#174)
This should go with cloudquery/cloudquery#2043
1 parent 8cb3e1a commit 26237c3

6 files changed

Lines changed: 40 additions & 61 deletions

File tree

clients/destination.go

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,13 @@ import (
66
"fmt"
77

88
"github.com/cloudquery/plugin-sdk/internal/pb"
9-
"github.com/cloudquery/plugin-sdk/plugins"
109
"github.com/cloudquery/plugin-sdk/schema"
1110
"github.com/cloudquery/plugin-sdk/specs"
1211
"google.golang.org/grpc"
1312
)
1413

1514
type DestinationClient struct {
1615
pbClient pb.DestinationClient
17-
// this can be used if we have a plugin which is compiled in, so we don't need to do any grpc requests
18-
localClient plugins.DestinationPlugin
1916
}
2017

2118
func NewDestinationClient(cc grpc.ClientConnInterface) *DestinationClient {
@@ -24,16 +21,7 @@ func NewDestinationClient(cc grpc.ClientConnInterface) *DestinationClient {
2421
}
2522
}
2623

27-
func NewLocalDestinationClient(p plugins.DestinationPlugin) *DestinationClient {
28-
return &DestinationClient{
29-
localClient: p,
30-
}
31-
}
32-
3324
func (c *DestinationClient) Name(ctx context.Context) (string, error) {
34-
if c.localClient != nil {
35-
return c.localClient.Name(), nil
36-
}
3725
res, err := c.pbClient.GetName(ctx, &pb.GetName_Request{})
3826
if err != nil {
3927
return "", fmt.Errorf("failed to get name: %w", err)
@@ -42,9 +30,6 @@ func (c *DestinationClient) Name(ctx context.Context) (string, error) {
4230
}
4331

4432
func (c *DestinationClient) Version(ctx context.Context) (string, error) {
45-
if c.localClient != nil {
46-
return c.localClient.Version(), nil
47-
}
4833
res, err := c.pbClient.GetVersion(ctx, &pb.GetVersion_Request{})
4934
if err != nil {
5035
return "", fmt.Errorf("failed to get version: %w", err)
@@ -53,9 +38,6 @@ func (c *DestinationClient) Version(ctx context.Context) (string, error) {
5338
}
5439

5540
func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destination) error {
56-
if c.localClient != nil {
57-
return c.localClient.Initialize(ctx, spec)
58-
}
5941
b, err := json.Marshal(spec)
6042
if err != nil {
6143
return fmt.Errorf("destination configure: failed to marshal spec: %w", err)
@@ -70,9 +52,6 @@ func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destinati
7052
}
7153

7254
func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table) error {
73-
if c.localClient != nil {
74-
return c.localClient.Migrate(ctx, tables)
75-
}
7655
b, err := json.Marshal(tables)
7756
if err != nil {
7857
return fmt.Errorf("destination migrate: failed to marshal plugin: %w", err)
@@ -84,20 +63,22 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table)
8463
return nil
8564
}
8665

87-
func (c *DestinationClient) Write(ctx context.Context, table string, data map[string]interface{}) error {
88-
// var saveClient pb.Destination_SaveClient
89-
// var err error
90-
// if c.pbClient != nil {
91-
// saveClient, err = c.pbClient.Write(ctx)
92-
// if err != nil {
93-
// return fmt.Errorf("failed to create save client: %w", err)
94-
// }
95-
// }
96-
if c.localClient != nil {
97-
if err := c.localClient.Write(ctx, table, data); err != nil {
98-
return fmt.Errorf("failed to save resources: %w", err)
66+
// Write writes rows as they are received from the channel to the destination plugin.
67+
// resources is marshaled schema.Resource. We are not marshalling this inside the function
68+
// because usually it is alreadun marshalled from the source plugin.
69+
func (c *DestinationClient) Write(ctx context.Context, resources <-chan []byte) (uint64, error) {
70+
saveClient, err := c.pbClient.Write(ctx)
71+
if err != nil {
72+
return 0, fmt.Errorf("failed to create save client: %w", err)
73+
}
74+
var failedWrites uint64
75+
for resource := range resources {
76+
if err := saveClient.Send(&pb.Write_Request{
77+
Resource: resource,
78+
}); err != nil {
79+
failedWrites++
9980
}
10081
}
10182

102-
return nil
83+
return failedWrites, nil
10384
}

clients/source.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ func (c *SourceClient) GetTables(ctx context.Context) ([]*schema.Table, error) {
5555
return tables, nil
5656
}
5757

58-
func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- *schema.Resource) error {
58+
// Sync start syncing for the source client per the given spec and returning the results
59+
// in the given channel. res is marshaled schema.Resource. We are not unmarshalling this for performance reasons
60+
// as usually this is sent over-the-wire anyway to a destination plugin
61+
func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- []byte) error {
5962
b, err := json.Marshal(spec)
6063
if err != nil {
6164
return fmt.Errorf("failed to marshal source spec: %w", err)
@@ -74,12 +77,6 @@ func (c *SourceClient) Sync(ctx context.Context, spec specs.Source, res chan<- *
7477
}
7578
return fmt.Errorf("failed to fetch resources from stream: %w", err)
7679
}
77-
var resource schema.Resource
78-
err = json.Unmarshal(r.Resource, &resource)
79-
if err != nil {
80-
return fmt.Errorf("failed to unmarshal resource: %w", err)
81-
}
82-
83-
res <- &resource
80+
res <- r.Resource
8481
}
8582
}

internal/pb/base.pb.go

Lines changed: 5 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pb/base.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,5 @@ message Configure {
2424
}
2525
message Response {
2626
string error = 1;
27-
bytes jsonschema_result = 2;
2827
}
2928
}

internal/servers/destinations.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ func (s *DestinationServer) GetVersion(context.Context, *pb.GetVersion_Request)
3939
}, nil
4040
}
4141

42+
func (s *DestinationServer) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migrate_Response, error) {
43+
var tables []*schema.Table
44+
if err := json.Unmarshal(req.Tables, &tables); err != nil {
45+
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
46+
}
47+
return &pb.Migrate_Response{}, s.Plugin.Migrate(ctx, tables)
48+
}
49+
4250
func (s *DestinationServer) Write(msg pb.Destination_WriteServer) error {
4351
for {
4452
r, err := msg.Recv()

serve/source_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package serve
22

33
import (
44
"context"
5+
"encoding/json"
56
"net"
67
"testing"
78
"time"
@@ -105,7 +106,7 @@ func TestServe(t *testing.T) {
105106
t.Fatalf("Failed to dial bufnet: %v", err)
106107
}
107108
c := clients.NewSourceClient(conn)
108-
resources := make(chan *schema.Resource)
109+
resources := make(chan []byte)
109110
wg := errgroup.Group{}
110111
wg.Go(func() error {
111112
defer close(resources)
@@ -118,7 +119,11 @@ func TestServe(t *testing.T) {
118119
},
119120
resources)
120121
})
121-
for resource := range resources {
122+
for resourceB := range resources {
123+
var resource schema.Resource
124+
if err := json.Unmarshal(resourceB, &resource); err != nil {
125+
t.Fatalf("failed to unmarshal resource: %v", err)
126+
}
122127
if resource.TableName != "test_table" {
123128
t.Fatalf("Expected resource with table name test: %s", resource.TableName)
124129
}

0 commit comments

Comments
 (0)