Skip to content

Commit 1fea1f8

Browse files
authored
feat: Allow use of backend that is not in list of destinations (#13475)
Discovered while writing the documentation for backend options: we should be able to support using a backend that is different from the destination being synced to, but currently this doesn't work. Without the change in this PR, destinations like BigQuery and Snowflake wouldn't be able to use state with incremental tables. With this change, it can be achieved by using a different destination as the backend (e.g. PostgreSQL, SQLite, or any other that supports `overwrite` mode)
1 parent 9dfb9ac commit 1fea1f8

5 files changed

Lines changed: 106 additions & 5 deletions

File tree

cli/cmd/sync.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,19 @@ func sync(cmd *cobra.Command, args []string) error {
172172

173173
var destinationClientsForSource []*managedplugin.Client
174174
var destinationForSourceSpec []specs.Destination
175+
var backendClientForSource *managedplugin.Client
176+
var destinationForSourceBackendSpec *specs.Destination
175177
for _, destination := range destinations {
176178
if slices.Contains(source.Destinations, destination.Name) {
177179
destinationClientsForSource = append(destinationClientsForSource, destinationPluginClients.ClientByName(destination.Name))
178180
destinationForSourceSpec = append(destinationForSourceSpec, *destination)
181+
continue
182+
}
183+
184+
// if the destination is specified as a backend, but not used as a destination, then we initialize it separately
185+
if source.BackendOptions != nil && strings.Contains(source.BackendOptions.Connection, "@@plugins."+destination.Name+".") {
186+
backendClientForSource = destinationPluginClients.ClientByName(destination.Name)
187+
destinationForSourceBackendSpec = destination
179188
}
180189
}
181190
switch maxVersion {
@@ -210,7 +219,26 @@ func sync(cmd *cobra.Command, args []string) error {
210219
destinationForSourceSpec[i].Spec["batch_size_bytes"] = destinationForSourceSpec[i].BatchSizeBytes // nolint:staticcheck // use of deprecated field
211220
}
212221
}
213-
if err := syncConnectionV3(ctx, cl, destinationClientsForSource, *source, destinationForSourceSpec, invocationUUID.String(), noMigrate); err != nil {
222+
223+
src := v3source{
224+
client: cl,
225+
spec: *source,
226+
}
227+
dests := make([]v3destination, 0, len(destinationClientsForSource))
228+
for i, destination := range destinationClientsForSource {
229+
dests = append(dests, v3destination{
230+
client: destination,
231+
spec: destinationForSourceSpec[i],
232+
})
233+
}
234+
var backend *v3destination
235+
if backendClientForSource != nil && destinationForSourceBackendSpec != nil {
236+
backend = &v3destination{
237+
client: backendClientForSource,
238+
spec: *destinationForSourceBackendSpec,
239+
}
240+
}
241+
if err := syncConnectionV3(ctx, src, dests, backend, invocationUUID.String(), noMigrate); err != nil {
214242
return fmt.Errorf("failed to sync v3 source %s: %w", cl.Name(), err)
215243
}
216244
case 2:

cli/cmd/sync_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ func TestSync(t *testing.T) {
3232
name: "multiple_sources_destinations",
3333
config: "multiple-sources-destinations.yml",
3434
},
35+
{
36+
name: "different_backend_from_destination",
37+
config: "different-backend-from-destination.yml",
38+
},
3539
{
3640
name: "should fail with missing path error when path is missing",
3741
config: "sync-missing-path-error.yml",

cli/cmd/sync_v3.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,31 @@ import (
2020
"google.golang.org/protobuf/types/known/timestamppb"
2121
)
2222

23+
type v3source struct {
24+
client *managedplugin.Client
25+
spec specs.Source
26+
}
27+
28+
type v3destination struct {
29+
client *managedplugin.Client
30+
spec specs.Destination
31+
}
32+
2333
// nolint:dupl
24-
func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, destinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, uid string, noMigrate bool) error {
34+
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool) error {
2535
var mt metrics.Metrics
2636
var exitReason = ExitReasonStopped
2737
tables := make(map[string]bool, 0)
38+
39+
sourceSpec := source.spec
40+
sourceClient := source.client
41+
destinationSpecs := make([]specs.Destination, len(destinations))
42+
destinationsClients := make([]*managedplugin.Client, len(destinations))
43+
for i := range destinations {
44+
destinationSpecs[i] = destinations[i].spec
45+
destinationsClients[i] = destinations[i].client
46+
}
47+
2848
defer func() {
2949
if analyticsClient != nil {
3050
log.Info().Msg("Sending sync summary to " + analyticsClient.Host())
@@ -48,6 +68,7 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
4868
sourcePbClient := plugin.NewPluginClient(sourceClient.Conn)
4969
destinationsPbClients := make([]plugin.PluginClient, len(destinationsClients))
5070
destinationTransformers := make([]*transformer.RecordTransformer, len(destinationsClients))
71+
backendPbClient := plugin.PluginClient(nil)
5172
for i := range destinationsClients {
5273
destinationsPbClients[i] = plugin.NewPluginClient(destinationsClients[i].Conn)
5374
opts := []transformer.RecordTransformerOption{
@@ -66,6 +87,13 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
6687
Connection: connection,
6788
}
6889
}
90+
if backend != nil {
91+
backendPbClient = plugin.NewPluginClient(backend.client.Conn)
92+
connection := backend.client.ConnectionString()
93+
variables.Plugins[backend.spec.Name] = specs.PluginVariables{
94+
Connection: connection,
95+
}
96+
}
6997

7098
// initialize destinations first, so that their connections may be used as backends by the source
7199
for i := range destinationsClients {
@@ -80,19 +108,31 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
80108
return fmt.Errorf("failed to init destination %v: %w", destSpec.Name, err)
81109
}
82110
}
111+
if backend != nil {
112+
backendSpec := backend.spec
113+
backendSpecBytes, err := json.Marshal(backendSpec.Spec)
114+
if err != nil {
115+
return err
116+
}
117+
if _, err := backendPbClient.Init(ctx, &plugin.Init_Request{
118+
Spec: backendSpecBytes,
119+
}); err != nil {
120+
return fmt.Errorf("failed to init backend %v: %w", backendSpec.Name, err)
121+
}
122+
}
83123

84124
// replace @@plugins.name.connection with the actual GRPC connection string from the client
85125
// NOTE: if this becomes a stable feature, it can move out of sync_v3 and into sync.go
86126
specBytes, err := json.Marshal(sourceSpec)
87127
if err != nil {
88-
return err
128+
return fmt.Errorf("failed to marshal source spec JSON before variable replacement: %w", err)
89129
}
90130
specBytesExpanded, err := specs.ReplaceVariables(string(specBytes), variables)
91131
if err != nil {
92-
return err
132+
return fmt.Errorf("failed to replace variables: %w", err)
93133
}
94134
if err := json.Unmarshal([]byte(specBytesExpanded), &sourceSpec); err != nil {
95-
return err
135+
return fmt.Errorf("failed to unmarshal source spec JSON after variable replacement: %w", err)
96136
}
97137

98138
sourceSpecBytes, err := json.Marshal(sourceSpec.Spec)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
kind: "source"
2+
spec:
3+
name: "test"
4+
path: "cloudquery/test"
5+
destinations: ["test1"]
6+
backend_options:
7+
table_name: "test_backend"
8+
connection: "@@plugins.test2.connection"
9+
version: "v3.1.1" # latest version of source test plugin
10+
tables: ["*"]
11+
---
12+
kind: "destination"
13+
spec:
14+
name: "test1"
15+
path: "cloudquery/test"
16+
version: "v2.2.5" # latest version of destination test plugin
17+
---
18+
kind: "destination"
19+
spec:
20+
name: "test2"
21+
path: "cloudquery/test"
22+
version: "v2.2.5" # latest version of destination test plugin

cli/internal/specs/v0/variables.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,13 @@ func ReplaceVariables(src string, variables Variables) (string, error) {
4444
lastErr = fmt.Errorf("variable %s is not a string", variablePath)
4545
return s
4646
}
47+
// make safe for replacement into JSON string
48+
v, err := json.Marshal(resString)
49+
if err != nil {
50+
lastErr = err
51+
return s
52+
}
53+
resString = string(v[1 : len(v)-1])
4754
return resString
4855
})
4956
return result, lastErr

0 commit comments

Comments
 (0)