Skip to content

Commit 5fd5b59

Browse files
authored
fix: Apply schema and record transformations for v2 sources with v3 destinations (#13291)
#### Summary Fixes justmiles/cq-source-crowdstrike#5. We should apply the schema and record transformations when the source is v2 and the destination is v3, otherwise pks are not removed in `append` mode, sync time and source name columns are not added, etc. **This probably impacts all community/partners plugins as I believe those haven't upgraded to SDK v4.** Tried to do this the less possible ugly way I could think of. Tested with: ```yaml kind: source spec: name: azure path: cloudquery/azure version: v8.3.0 # v2 source destinations: ["bigquery", "postgresql"] tables: ["azure_storage_*"] --- kind: destination spec: name: bigquery path: cloudquery/bigquery version: "v3.3.0" # v3 destination write_mode: "append" spec: project_id: **** dataset_id: **** --- kind: destination spec: name: "postgresql" registry: "github" path: "cloudquery/postgresql" version: "v4.2.2" # v2 destination spec: connection_string: **** ``` <!--
1 parent 7eb3e06 commit 5fd5b59

4 files changed

Lines changed: 99 additions & 9 deletions

File tree

cli/cmd/migrate.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ func migrate(cmd *cobra.Command, args []string) error {
119119
return fmt.Errorf("failed to migrate v3 source %s: %w", cl.Name(), err)
120120
}
121121
case 2:
122+
destinationsVersions := make([][]int, 0, len(destinationClientsForSource))
122123
for _, destination := range destinationClientsForSource {
123124
versions, err := destination.Versions(ctx)
124125
if err != nil {
@@ -127,8 +128,9 @@ func migrate(cmd *cobra.Command, args []string) error {
127128
if !slices.Contains(versions, 1) {
128129
return fmt.Errorf("destination plugin %[1]s does not support CloudQuery SDK version 1. Please upgrade to a newer version of the %[1]s destination plugin", destination.Name())
129130
}
131+
destinationsVersions = append(destinationsVersions, versions)
130132
}
131-
if err := migrateConnectionV2(ctx, cl, destinationClientsForSource, *source, destinationForSourceSpec); err != nil {
133+
if err := migrateConnectionV2(ctx, cl, destinationClientsForSource, *source, destinationForSourceSpec, destinationsVersions); err != nil {
132134
return fmt.Errorf("failed to migrate source %v@%v: %w", source.Name, source.Version, err)
133135
}
134136
case 1:

cli/cmd/migrate_v2.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"github.com/rs/zerolog/log"
1414
)
1515

16-
func migrateConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, managedDestinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination) error {
16+
func migrateConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, managedDestinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, destinationsVersions [][]int) error {
1717
destinationStrings := make([]string, len(destinationSpecs))
1818
for i := range destinationSpecs {
1919
destinationStrings[i] = destinationSpecs[i].VersionString()
@@ -23,6 +23,7 @@ func migrateConnectionV2(ctx context.Context, sourceClient *managedplugin.Client
2323
defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationStrings).Time("migrate_time", migrateStart).Msg("End migration")
2424

2525
sourcePbClient := pbSource.NewSourceClient(sourceClient.Conn)
26+
destinationsTransformers := getSourceV2DestV3DestinationsTransformers(sourceSpec, destinationSpecs, migrateStart, destinationsVersions)
2627
destinationsPbClients := make([]pbdestination.DestinationClient, len(managedDestinationsClients))
2728
for i := range managedDestinationsClients {
2829
destinationsPbClients[i] = pbdestination.NewDestinationClient(managedDestinationsClients[i].Conn)
@@ -42,6 +43,15 @@ func migrateConnectionV2(ctx context.Context, sourceClient *managedplugin.Client
4243
return fmt.Errorf("failed to GetDynamicTables: %w", err)
4344
}
4445

46+
transformedSchemasBytes := make([][][]byte, 0, len(managedDestinationsClients))
47+
for i := range managedDestinationsClients {
48+
destinationSchemasBytes, err := transformSourceV2DestV3Schemas(tablesRes.Tables, destinationsTransformers[i])
49+
if err != nil {
50+
return err
51+
}
52+
transformedSchemasBytes = append(transformedSchemasBytes, destinationSchemasBytes)
53+
}
54+
4555
fmt.Printf("Starting migration with for: %s -> %s\n", sourceSpec.VersionString(), destinationStrings)
4656
for i := range managedDestinationsClients {
4757
destSpecBytes, err := json.Marshal(CLIDestinationSpecToPbSpec(destinationSpecs[i]))
@@ -54,7 +64,7 @@ func migrateConnectionV2(ctx context.Context, sourceClient *managedplugin.Client
5464
return fmt.Errorf("failed to call Migrate: %w", err)
5565
}
5666
if _, err := destinationsPbClients[i].Migrate(ctx, &pbdestination.Migrate_Request{
57-
Tables: tablesRes.Tables,
67+
Tables: transformedSchemasBytes[i],
5868
}); err != nil {
5969
return fmt.Errorf("failed to call Migrate: %w", err)
6070
}

cli/cmd/sync.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func sync(cmd *cobra.Command, args []string) error {
214214
return fmt.Errorf("failed to sync v3 source %s: %w", cl.Name(), err)
215215
}
216216
case 2:
217+
destinationsVersions := make([][]int, 0, len(destinationClientsForSource))
217218
for _, destination := range destinationClientsForSource {
218219
versions, err := destination.Versions(ctx)
219220
if err != nil {
@@ -222,8 +223,9 @@ func sync(cmd *cobra.Command, args []string) error {
222223
if !slices.Contains(versions, 1) {
223224
return fmt.Errorf("destination plugin %[1]s does not support CloudQuery SDK version 1. Please upgrade to a newer version of the %[1]s destination plugin", destination.Name())
224225
}
226+
destinationsVersions = append(destinationsVersions, versions)
225227
}
226-
if err := syncConnectionV2(ctx, cl, destinationClientsForSource, *source, destinationForSourceSpec, invocationUUID.String(), noMigrate); err != nil {
228+
if err := syncConnectionV2(ctx, cl, destinationClientsForSource, *source, destinationForSourceSpec, invocationUUID.String(), noMigrate, destinationsVersions); err != nil {
227229
return fmt.Errorf("failed to sync v2 source %s: %w", cl.Name(), err)
228230
}
229231
case 1:

cli/cmd/sync_v2.go

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,79 @@ import (
99
"time"
1010

1111
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
12+
"github.com/cloudquery/cloudquery/cli/internal/transformer"
1213
"github.com/cloudquery/plugin-pb-go/managedplugin"
1314
"github.com/cloudquery/plugin-pb-go/metrics"
1415
"github.com/cloudquery/plugin-pb-go/pb/destination/v1"
16+
pluginv3 "github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
1517
"github.com/cloudquery/plugin-pb-go/pb/source/v2"
1618
"github.com/rs/zerolog/log"
1719
"github.com/schollz/progressbar/v3"
20+
"golang.org/x/exp/slices"
1821
"google.golang.org/protobuf/types/known/timestamppb"
1922
)
2023

24+
func getSourceV2DestV3DestinationsTransformers(sourceSpec specs.Source, destinationSpecs []specs.Destination, syncTime time.Time, destinationsVersions [][]int) []*transformer.RecordTransformer {
25+
destinationsTransformers := make([]*transformer.RecordTransformer, 0, len(destinationsVersions))
26+
for i := range destinationsVersions {
27+
// We only need to transform to destinations that are v3
28+
if !slices.Contains(destinationsVersions[i], 3) {
29+
destinationsTransformers = append(destinationsTransformers, nil)
30+
continue
31+
}
32+
opts := []transformer.RecordTransformerOption{
33+
transformer.WithSourceNameColumn(sourceSpec.Name),
34+
transformer.WithSyncTimeColumn(syncTime),
35+
}
36+
if destinationSpecs[i].WriteMode == specs.WriteModeAppend {
37+
opts = append(opts, transformer.WithRemovePKs())
38+
} else if destinationSpecs[i].PKMode == specs.PKModeCQID {
39+
opts = append(opts, transformer.WithRemovePKs())
40+
opts = append(opts, transformer.WithCQIDPrimaryKey())
41+
}
42+
destinationsTransformers = append(destinationsTransformers, transformer.NewRecordTransformer(opts...))
43+
}
44+
return destinationsTransformers
45+
}
46+
47+
func transformSourceV2DestV3Schemas(originalSchemas [][]byte, recordTransformer *transformer.RecordTransformer) ([][]byte, error) {
48+
if recordTransformer == nil {
49+
return originalSchemas, nil
50+
}
51+
transformedSchemasBytes := make([][]byte, 0, len(originalSchemas))
52+
for _, s := range originalSchemas {
53+
schema, err := pluginv3.NewSchemaFromBytes(s)
54+
if err != nil {
55+
return nil, err
56+
}
57+
transformedSchema := recordTransformer.TransformSchema(schema)
58+
transformedSchemaBytes, err := pluginv3.SchemaToBytes(transformedSchema)
59+
if err != nil {
60+
return nil, err
61+
}
62+
transformedSchemasBytes = append(transformedSchemasBytes, transformedSchemaBytes)
63+
}
64+
return transformedSchemasBytes, nil
65+
}
66+
67+
func transformSourceV2DestV3Resource(originalResourceBytes []byte, recordTransformer *transformer.RecordTransformer) ([]byte, error) {
68+
if recordTransformer == nil {
69+
return originalResourceBytes, nil
70+
}
71+
resource, err := pluginv3.NewRecordFromBytes(originalResourceBytes)
72+
if err != nil {
73+
return nil, err
74+
}
75+
transformedResource := recordTransformer.Transform(resource)
76+
transformedResourceBytes, err := pluginv3.RecordToBytes(transformedResource)
77+
if err != nil {
78+
return nil, err
79+
}
80+
return transformedResourceBytes, nil
81+
}
82+
2183
// nolint:dupl
22-
func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, destinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, uid string, noMigrate bool) error {
84+
func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, destinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, uid string, noMigrate bool, destinationsVersions [][]int) error {
2385
var mt metrics.Metrics
2486
var exitReason = ExitReasonStopped
2587
defer func() {
@@ -40,6 +102,7 @@ func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, d
40102

41103
sourcePbClient := source.NewSourceClient(sourceClient.Conn)
42104
destinationsPbClients := make([]destination.DestinationClient, len(destinationsClients))
105+
destinationsTransformers := getSourceV2DestV3DestinationsTransformers(sourceSpec, destinationSpecs, syncTime, destinationsVersions)
43106
for i := range destinationsClients {
44107
destinationsPbClients[i] = destination.NewDestinationClient(destinationsClients[i].Conn)
45108
}
@@ -69,12 +132,21 @@ func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, d
69132
}
70133
}
71134

135+
transformedSchemasBytes := make([][][]byte, 0, len(destinationsPbClients))
136+
for i := range destinationsPbClients {
137+
destinationSchemasBytes, err := transformSourceV2DestV3Schemas(tablesRes.Tables, destinationsTransformers[i])
138+
if err != nil {
139+
return err
140+
}
141+
transformedSchemasBytes = append(transformedSchemasBytes, destinationSchemasBytes)
142+
}
143+
72144
if !noMigrate {
73145
migrateStart := time.Now().UTC()
74146
fmt.Printf("Starting migration for: %s -> %s\n", sourceSpec.VersionString(), destinationStrings)
75147
for i := range destinationsClients {
76148
if _, err := destinationsPbClients[i].Migrate(ctx, &destination.Migrate_Request{
77-
Tables: tablesRes.Tables,
149+
Tables: transformedSchemasBytes[i],
78150
}); err != nil {
79151
return err
80152
}
@@ -105,7 +177,7 @@ func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, d
105177
}
106178
if err := writeClients[i].Send(&destination.Write_Request{
107179
Source: sourceSpec.Name,
108-
Tables: tablesRes.Tables,
180+
Tables: transformedSchemasBytes[i],
109181
Timestamp: timestamppb.New(syncTime),
110182
}); err != nil {
111183
return err
@@ -145,8 +217,12 @@ func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, d
145217
}
146218
_ = bar.Add(1)
147219
for i := range destinationsPbClients {
220+
transformedResourceBytes, err := transformSourceV2DestV3Resource(r.Resource, destinationsTransformers[i])
221+
if err != nil {
222+
return err
223+
}
148224
if err := writeClients[i].Send(&destination.Write_Request{
149-
Resource: r.Resource,
225+
Resource: transformedResourceBytes,
150226
}); err != nil {
151227
return err
152228
}
@@ -155,7 +231,7 @@ func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, d
155231
for i := range destinationsClients {
156232
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
157233
_, err := destinationsPbClients[i].DeleteStale(ctx, &destination.DeleteStale_Request{
158-
Tables: tablesRes.Tables,
234+
Tables: transformedSchemasBytes[i],
159235
Source: sourceSpec.Name,
160236
Timestamp: timestamppb.New(syncTime),
161237
})

0 commit comments

Comments
 (0)