@@ -6,16 +6,13 @@ import (
66 "fmt"
77
88 "github.com/cloudquery/plugin-sdk/internal/pb"
9- "github.com/cloudquery/plugin-sdk/plugins"
109 "github.com/cloudquery/plugin-sdk/schema"
1110 "github.com/cloudquery/plugin-sdk/specs"
1211 "google.golang.org/grpc"
1312)
1413
1514type DestinationClient struct {
1615 pbClient pb.DestinationClient
17- // this can be used if we have a plugin which is compiled in, so we don't need to do any grpc requests
18- localClient plugins.DestinationPlugin
1916}
2017
2118func NewDestinationClient (cc grpc.ClientConnInterface ) * DestinationClient {
@@ -24,16 +21,7 @@ func NewDestinationClient(cc grpc.ClientConnInterface) *DestinationClient {
2421 }
2522}
2623
27- func NewLocalDestinationClient (p plugins.DestinationPlugin ) * DestinationClient {
28- return & DestinationClient {
29- localClient : p ,
30- }
31- }
32-
3324func (c * DestinationClient ) Name (ctx context.Context ) (string , error ) {
34- if c .localClient != nil {
35- return c .localClient .Name (), nil
36- }
3725 res , err := c .pbClient .GetName (ctx , & pb.GetName_Request {})
3826 if err != nil {
3927 return "" , fmt .Errorf ("failed to get name: %w" , err )
@@ -42,9 +30,6 @@ func (c *DestinationClient) Name(ctx context.Context) (string, error) {
4230}
4331
4432func (c * DestinationClient ) Version (ctx context.Context ) (string , error ) {
45- if c .localClient != nil {
46- return c .localClient .Version (), nil
47- }
4833 res , err := c .pbClient .GetVersion (ctx , & pb.GetVersion_Request {})
4934 if err != nil {
5035 return "" , fmt .Errorf ("failed to get version: %w" , err )
@@ -53,9 +38,6 @@ func (c *DestinationClient) Version(ctx context.Context) (string, error) {
5338}
5439
5540func (c * DestinationClient ) Initialize (ctx context.Context , spec specs.Destination ) error {
56- if c .localClient != nil {
57- return c .localClient .Initialize (ctx , spec )
58- }
5941 b , err := json .Marshal (spec )
6042 if err != nil {
6143 return fmt .Errorf ("destination configure: failed to marshal spec: %w" , err )
@@ -70,9 +52,6 @@ func (c *DestinationClient) Initialize(ctx context.Context, spec specs.Destinati
7052}
7153
7254func (c * DestinationClient ) Migrate (ctx context.Context , tables []* schema.Table ) error {
73- if c .localClient != nil {
74- return c .localClient .Migrate (ctx , tables )
75- }
7655 b , err := json .Marshal (tables )
7756 if err != nil {
7857 return fmt .Errorf ("destination migrate: failed to marshal plugin: %w" , err )
@@ -84,20 +63,22 @@ func (c *DestinationClient) Migrate(ctx context.Context, tables []*schema.Table)
8463 return nil
8564}
8665
87- func (c * DestinationClient ) Write (ctx context.Context , table string , data map [string ]interface {}) error {
88- // var saveClient pb.Destination_SaveClient
89- // var err error
90- // if c.pbClient != nil {
91- // saveClient, err = c.pbClient.Write(ctx)
92- // if err != nil {
93- // return fmt.Errorf("failed to create save client: %w", err)
94- // }
95- // }
96- if c .localClient != nil {
97- if err := c .localClient .Write (ctx , table , data ); err != nil {
98- return fmt .Errorf ("failed to save resources: %w" , err )
66+ // Write writes rows as they are received from the channel to the destination plugin.
67+ // resources is marshaled schema.Resource. We are not marshalling this inside the function
68+ // because usually it is alreadun marshalled from the source plugin.
69+ func (c * DestinationClient ) Write (ctx context.Context , resources <- chan []byte ) (uint64 , error ) {
70+ saveClient , err := c .pbClient .Write (ctx )
71+ if err != nil {
72+ return 0 , fmt .Errorf ("failed to create save client: %w" , err )
73+ }
74+ var failedWrites uint64
75+ for resource := range resources {
76+ if err := saveClient .Send (& pb.Write_Request {
77+ Resource : resource ,
78+ }); err != nil {
79+ failedWrites ++
9980 }
10081 }
10182
102- return nil
83+ return failedWrites , nil
10384}
0 commit comments