Skip to content

Commit f548a54

Browse files
authored
fix: Change options for new client (#603)
I think I missed it in the review but the api should include an option struct rather then the `...option` as otherwise it's just introduce un-necessary boilerplate for each client. Also, I've added BackendNone as otherwise there is no way to specify that you don't want incremental syncing.
1 parent c196a74 commit f548a54

17 files changed

Lines changed: 186 additions & 131 deletions

clients/destination.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (c *DestinationClient) Write(ctx context.Context, source string, syncTime t
276276
return res.FailedWrites, nil
277277
}
278278

279-
func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, source string, syncTime time.Time, resources <-chan []byte) error {
279+
func (c *DestinationClient) Write2(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, resources <-chan []byte) error {
280280
saveClient, err := c.pbClient.Write2(ctx)
281281
if err != nil {
282282
return fmt.Errorf("failed to call Write2: %w", err)
@@ -285,10 +285,15 @@ func (c *DestinationClient) Write2(ctx context.Context, tables schema.Tables, so
285285
if err != nil {
286286
return fmt.Errorf("failed to marshal tables: %w", err)
287287
}
288+
sourceSpecBytes, err := json.Marshal(sourceSpec)
289+
if err != nil {
290+
return fmt.Errorf("failed to marshal source spec: %w", err)
291+
}
288292
if err := saveClient.Send(&pb.Write2_Request{
289-
Tables: b,
290-
Source: source,
291-
Timestamp: timestamppb.New(syncTime),
293+
Tables: b,
294+
Source: sourceSpec.Name,
295+
Timestamp: timestamppb.New(syncTime),
296+
SourceSpec: sourceSpecBytes,
292297
}); err != nil {
293298
return fmt.Errorf("failed to send tables: %w", err)
294299
}

clients/destination_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
8686
t.Fatal(err)
8787
}
8888

89-
name, err := c.Name(ctx)
89+
_, err = c.Name(ctx)
9090
if err != nil {
9191
t.Fatal("failed to get name", err)
9292
}
@@ -108,7 +108,9 @@ func TestDestinationClientWriteReturnsCorrectError(t *testing.T) {
108108
resourcesChannel <- destResource2
109109
}
110110
}()
111-
112-
err = c.Write2(ctx, tables, name, time.Now().UTC(), resourcesChannel)
111+
sourceSpec := specs.Source{
112+
Name: "TestDestinationClientWriteReturnsCorrectError",
113+
}
114+
err = c.Write2(ctx, sourceSpec, tables, time.Now().UTC(), resourcesChannel)
113115
require.ErrorContains(t, err, "context canceled")
114116
}

internal/memdb/memdb_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,16 @@ func TestOnWriteError(t *testing.T) {
6262
}
6363
sourceName := "TestDestinationOnWriteError"
6464
syncTime := time.Now()
65+
sourceSpec := specs.Source{
66+
Name: sourceName,
67+
}
6568
ch := make(chan schema.DestinationResource, 1)
6669
ch <- schema.DestinationResource{
6770
TableName: "test",
6871
Data: testdata.GenTestData(table),
6972
}
7073
close(ch)
71-
err := p.Write(ctx, tables, sourceName, syncTime, ch)
74+
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)
7275
if err == nil {
7376
t.Fatal("expected error")
7477
}
@@ -90,14 +93,17 @@ func TestOnWriteCtxCancelled(t *testing.T) {
9093
}
9194
sourceName := "TestDestinationOnWriteError"
9295
syncTime := time.Now()
96+
sourceSpec := specs.Source{
97+
Name: sourceName,
98+
}
9399
ch := make(chan schema.DestinationResource, 1)
94100
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
95101
ch <- schema.DestinationResource{
96102
TableName: "test",
97103
Data: testdata.GenTestData(table),
98104
}
99105
defer cancel()
100-
err := p.Write(ctx, tables, sourceName, syncTime, ch)
106+
err := p.Write(ctx, sourceSpec, tables, syncTime, ch)
101107
if err != nil {
102108
t.Fatal(err)
103109
}

internal/pb/destination.pb.go

Lines changed: 77 additions & 66 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/pb/destination.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ message Write2 {
6161
bytes tables = 3;
6262
// marshalled *schema.Resources
6363
bytes resource = 4;
64+
// marshalled specs.Source
65+
bytes source_spec = 5;
6466
}
6567
message Response {
6668
}

internal/servers/destinations.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,22 @@ func (s *DestinationServer) Write2(msg pb.Destination_Write2Server) error {
7676
if err := json.Unmarshal(r.Tables, &tables); err != nil {
7777
return status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
7878
}
79-
sourceName := r.Source
79+
var sourceSpec specs.Source
80+
if r.SourceSpec == nil {
81+
// this is for backward compatibility
82+
sourceSpec = specs.Source{
83+
Name: r.Source,
84+
}
85+
} else {
86+
if err := json.Unmarshal(r.SourceSpec, &sourceSpec); err != nil {
87+
return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err)
88+
}
89+
}
8090
syncTime := r.Timestamp.AsTime()
8191

8292
eg, ctx := errgroup.WithContext(msg.Context())
8393
eg.Go(func() error {
84-
return s.Plugin.Write(ctx, tables, sourceName, syncTime, resources)
94+
return s.Plugin.Write(ctx, sourceSpec, tables, syncTime, resources)
8595
})
8696

8797
for {

plugins/destination/managed_writer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/cloudquery/plugin-sdk/schema"
10+
"github.com/cloudquery/plugin-sdk/specs"
1011
)
1112

1213
type worker struct {
@@ -65,7 +66,7 @@ func (p *Plugin) flush(ctx context.Context, metrics *Metrics, table *schema.Tabl
6566
}
6667
}
6768

68-
func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time, res <-chan schema.DestinationResource) error {
69+
func (p *Plugin) writeManagedTableBatch(ctx context.Context, sourceSpec specs.Source, tables schema.Tables, syncTime time.Time, res <-chan schema.DestinationResource) error {
6970
syncTime = syncTime.UTC()
7071
SetDestinationManagedCqColumns(tables)
7172

@@ -100,7 +101,7 @@ func (p *Plugin) writeManagedTableBatch(ctx context.Context, tables schema.Table
100101
p.workersLock.Unlock()
101102

102103
sourceColumn := &schema.Text{}
103-
_ = sourceColumn.Set(sourceName)
104+
_ = sourceColumn.Set(sourceSpec.Name)
104105
syncTimeColumn := &schema.Timestamptz{}
105106
_ = syncTimeColumn.Set(syncTime)
106107
for r := range res {

0 commit comments

Comments
 (0)