@@ -20,11 +20,31 @@ import (
2020 "google.golang.org/protobuf/types/known/timestamppb"
2121)
2222
23+ type v3source struct {
24+ client * managedplugin.Client
25+ spec specs.Source
26+ }
27+
28+ type v3destination struct {
29+ client * managedplugin.Client
30+ spec specs.Destination
31+ }
32+
2333// nolint:dupl
24- func syncConnectionV3 (ctx context.Context , sourceClient * managedplugin. Client , destinationsClients managedplugin. Clients , sourceSpec specs. Source , destinationSpecs []specs. Destination , uid string , noMigrate bool ) error {
34+ func syncConnectionV3 (ctx context.Context , source v3source , destinations [] v3destination , backend * v3destination , uid string , noMigrate bool ) error {
2535 var mt metrics.Metrics
2636 var exitReason = ExitReasonStopped
2737 tables := make (map [string ]bool , 0 )
38+
39+ sourceSpec := source .spec
40+ sourceClient := source .client
41+ destinationSpecs := make ([]specs.Destination , len (destinations ))
42+ destinationsClients := make ([]* managedplugin.Client , len (destinations ))
43+ for i := range destinations {
44+ destinationSpecs [i ] = destinations [i ].spec
45+ destinationsClients [i ] = destinations [i ].client
46+ }
47+
2848 defer func () {
2949 if analyticsClient != nil {
3050 log .Info ().Msg ("Sending sync summary to " + analyticsClient .Host ())
@@ -48,6 +68,7 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
4868 sourcePbClient := plugin .NewPluginClient (sourceClient .Conn )
4969 destinationsPbClients := make ([]plugin.PluginClient , len (destinationsClients ))
5070 destinationTransformers := make ([]* transformer.RecordTransformer , len (destinationsClients ))
71+ backendPbClient := plugin .PluginClient (nil )
5172 for i := range destinationsClients {
5273 destinationsPbClients [i ] = plugin .NewPluginClient (destinationsClients [i ].Conn )
5374 opts := []transformer.RecordTransformerOption {
@@ -66,6 +87,13 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
6687 Connection : connection ,
6788 }
6889 }
90+ if backend != nil {
91+ backendPbClient = plugin .NewPluginClient (backend .client .Conn )
92+ connection := backend .client .ConnectionString ()
93+ variables .Plugins [backend .spec .Name ] = specs.PluginVariables {
94+ Connection : connection ,
95+ }
96+ }
6997
7098 // initialize destinations first, so that their connections may be used as backends by the source
7199 for i := range destinationsClients {
@@ -80,19 +108,31 @@ func syncConnectionV3(ctx context.Context, sourceClient *managedplugin.Client, d
80108 return fmt .Errorf ("failed to init destination %v: %w" , destSpec .Name , err )
81109 }
82110 }
111+ if backend != nil {
112+ backendSpec := backend .spec
113+ backendSpecBytes , err := json .Marshal (backendSpec .Spec )
114+ if err != nil {
115+ return err
116+ }
117+ if _ , err := backendPbClient .Init (ctx , & plugin.Init_Request {
118+ Spec : backendSpecBytes ,
119+ }); err != nil {
120+ return fmt .Errorf ("failed to init backend %v: %w" , backendSpec .Name , err )
121+ }
122+ }
83123
84124 // replace @@plugins.name.connection with the actual GRPC connection string from the client
85125 // NOTE: if this becomes a stable feature, it can move out of sync_v3 and into sync.go
86126 specBytes , err := json .Marshal (sourceSpec )
87127 if err != nil {
88- return err
128+ return fmt . Errorf ( "failed to marshal source spec JSON before variable replacement: %w" , err )
89129 }
90130 specBytesExpanded , err := specs .ReplaceVariables (string (specBytes ), variables )
91131 if err != nil {
92- return err
132+ return fmt . Errorf ( "failed to replace variables: %w" , err )
93133 }
94134 if err := json .Unmarshal ([]byte (specBytesExpanded ), & sourceSpec ); err != nil {
95- return err
135+ return fmt . Errorf ( "failed to unmarshal source spec JSON after variable replacement: %w" , err )
96136 }
97137
98138 sourceSpecBytes , err := json .Marshal (sourceSpec .Spec )
0 commit comments