Skip to content

Commit afad73e

Browse files
authored
fix: Replace internal columns if exist on Cloud syncs (#20456)
#### Summary Goes with cloudquery/plugin-sdk#2105 For Cloud syncs we should replace the internal columns if exist so we can trace back the synced data to the correct source name, sync time and sync group id (otherwise we'd get the ones of the original sync, e.g. `AWS->S3`)
1 parent 09d13bc commit afad73e

File tree

7 files changed

+38
-38
lines changed

7 files changed

+38
-38
lines changed

cli/cmd/sync.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strings"
1010

1111
"github.com/cloudquery/cloudquery/cli/v6/internal/auth"
12+
"github.com/cloudquery/cloudquery/cli/v6/internal/env"
1213
"github.com/cloudquery/cloudquery/cli/v6/internal/otel"
1314
"github.com/cloudquery/cloudquery/cli/v6/internal/specs/v0"
1415
"github.com/cloudquery/plugin-pb-go/managedplugin"
@@ -148,7 +149,7 @@ func sync(cmd *cobra.Command, args []string) error {
148149
}
149150

150151
// in the cloud sync environment, we pass only the relevant environment variables to the plugin
151-
_, isolatePluginEnvironment := os.LookupEnv("CQ_CLOUD")
152+
isolatePluginEnvironment := env.IsCloud()
152153

153154
ctx := cmd.Context()
154155
log.Info().Strs("args", args).Msg("Loading spec(s)")
@@ -486,7 +487,7 @@ func sync(cmd *cobra.Command, args []string) error {
486487
}
487488

488489
func filterPluginEnv(environ []string, pluginName, kind string) []string {
489-
env := make([]string, 0, len(environ))
490+
pluginEnv := make([]string, 0, len(environ))
490491
cleanName := strings.ReplaceAll(pluginName, "-", "_")
491492
prefix := strings.ToUpper("__" + kind + "_" + cleanName + "__")
492493

@@ -502,10 +503,10 @@ func filterPluginEnv(environ []string, pluginName, kind string) []string {
502503
case strings.HasPrefix(v, "_CQ_TEAM_NAME="),
503504
strings.HasPrefix(v, "_CQ_INSTALLATION_ID="),
504505
strings.HasPrefix(v, "HOME="):
505-
env = append(env, v)
506+
pluginEnv = append(pluginEnv, v)
506507
case strings.HasPrefix(v, prefix):
507508
cleanEnv := strings.TrimPrefix(v, prefix)
508-
env = append(env, cleanEnv)
509+
pluginEnv = append(pluginEnv, cleanEnv)
509510
if strings.HasPrefix(cleanEnv, "CLOUDQUERY_API_KEY=") ||
510511
strings.HasPrefix(cleanEnv, "AWS_") {
511512
k := getEnvKey(cleanEnv)
@@ -515,10 +516,10 @@ func filterPluginEnv(environ []string, pluginName, kind string) []string {
515516
}
516517
for k, v := range globalEnvironmentVariables {
517518
if _, ok := specificEnvironmentVariables[k]; !ok {
518-
env = append(env, v)
519+
pluginEnv = append(pluginEnv, v)
519520
}
520521
}
521-
return env
522+
return pluginEnv
522523
}
523524

524525
func getEnvKey(v string) string {

cli/cmd/test_connection.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
apiAuth "github.com/cloudquery/cloudquery-api-go/auth"
1313
"github.com/cloudquery/cloudquery/cli/v6/internal/api"
1414
"github.com/cloudquery/cloudquery/cli/v6/internal/auth"
15+
"github.com/cloudquery/cloudquery/cli/v6/internal/env"
1516
"github.com/cloudquery/cloudquery/cli/v6/internal/specs/v0"
1617
"github.com/cloudquery/plugin-pb-go/managedplugin"
1718
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
@@ -136,7 +137,7 @@ func testConnection(cmd *cobra.Command, args []string) error {
136137
updateSyncTestConnectionStatus(cmd.Context(), log.Logger, cloudquery_api.SyncTestConnectionStatusStarted)
137138

138139
// in the cloud sync environment, we pass only the relevant environment variables to the plugin
139-
_, isolatePluginEnvironment := os.LookupEnv("CQ_CLOUD")
140+
isolatePluginEnvironment := env.IsCloud()
140141
osEnviron := os.Environ()
141142

142143
log.Info().Strs("args", args).Msg("Loading spec(s)")
@@ -337,7 +338,7 @@ func testPluginConnection(ctx context.Context, client plugin.PluginClient, spec
337338
if err != nil {
338339
if gRPCErr, ok := grpcstatus.FromError(err); ok {
339340
if gRPCErr.Code() == codes.Unimplemented {
340-
if !isCloudBasedRequest() {
341+
if !env.IsCloud() {
341342
return &testConnectionResult{
342343
Success: false,
343344
FailureCode: "UNIMPLEMENTED",
@@ -389,7 +390,3 @@ func filterFailedTestResults(results []testConnectionResult) (*testConnectionRes
389390
return nil, errors.New("multiple test connection failures are not supported")
390391
}
391392
}
392-
393-
func isCloudBasedRequest() bool {
394-
return os.Getenv("CQ_CLOUD") != ""
395-
}

cli/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ require (
99
github.com/cloudquery/cloudquery-api-go v1.13.7
1010
github.com/cloudquery/codegen v0.3.23
1111
github.com/cloudquery/plugin-pb-go v1.26.8
12-
github.com/cloudquery/plugin-sdk/v4 v4.74.2
12+
github.com/cloudquery/plugin-sdk/v4 v4.75.0
1313
github.com/distribution/reference v0.6.0
1414
github.com/docker/distribution v2.8.3+incompatible
1515
github.com/docker/docker v26.1.5+incompatible

cli/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ github.com/cloudquery/jsonschema v0.0.0-20240220124159-92878faa2a66 h1:OZLPSIBYE
5050
github.com/cloudquery/jsonschema v0.0.0-20240220124159-92878faa2a66/go.mod h1:0SoZ/U7yJlNOR+fWsBSeTvTbGXB6DK01tzJ7m2Xfg34=
5151
github.com/cloudquery/plugin-pb-go v1.26.8 h1:mHNyOyG/MW/rjDVR0OR8DdezXmyoD4drphyHlYmnJoY=
5252
github.com/cloudquery/plugin-pb-go v1.26.8/go.mod h1:orruK+wdIP3kOvmEQhCcFND2bVVH7o0fFSGrqZYMt7k=
53-
github.com/cloudquery/plugin-sdk/v4 v4.74.2 h1:AHQZ9zxm0kLummKrT+dBYAvI6zf4j4DxfFXypg7kblw=
54-
github.com/cloudquery/plugin-sdk/v4 v4.74.2/go.mod h1:dDLB0XrS3R+nIWh4X8J8/ZQcsJ+lmjxUxoBxhAq8iCA=
53+
github.com/cloudquery/plugin-sdk/v4 v4.75.0 h1:N/edo8VA+YzfjIC+bQ3y7AAUHQEYzF6y+ZFYLyHKFMI=
54+
github.com/cloudquery/plugin-sdk/v4 v4.75.0/go.mod h1:dDLB0XrS3R+nIWh4X8J8/ZQcsJ+lmjxUxoBxhAq8iCA=
5555
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
5656
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
5757
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=

cli/internal/analytics/client.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package analytics
22

33
import (
44
"context"
5-
"os"
65
"time"
76

87
cqapi "github.com/cloudquery/cloudquery-api-go"
@@ -46,8 +45,7 @@ func InitClient() {
4645
}
4746

4847
func getEnvironment() string {
49-
_, ok := os.LookupEnv("CQ_CLOUD")
50-
if ok {
48+
if env.IsCloud() {
5149
return "cloud"
5250
}
5351
return "cli"

cli/internal/env/env.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@ func GetEnvOrDefault(env, def string) string {
88
}
99
return def
1010
}
11+
12+
func IsCloud() bool {
13+
_, ok := os.LookupEnv("CQ_CLOUD")
14+
return ok
15+
}

cli/internal/transformer/transformer.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55

66
"github.com/apache/arrow-go/v18/arrow"
77
"github.com/apache/arrow-go/v18/arrow/array"
8-
"github.com/apache/arrow-go/v18/arrow/memory"
8+
"github.com/cloudquery/cloudquery/cli/v6/internal/env"
99
"github.com/cloudquery/plugin-sdk/v4/schema"
1010
)
1111

@@ -138,30 +138,29 @@ func (t *RecordTransformer) Transform(record arrow.Record) arrow.Record {
138138
nRows := int(record.NumRows())
139139

140140
cols := make([]arrow.Array, 0, len(sc.Fields())+t.internalColumns)
141-
if t.withSyncTime && !sc.HasField(cqSyncTime) {
142-
ts, _ := arrow.TimestampFromTime(t.syncTime, arrow.Microsecond)
143-
syncTimeBldr := array.NewTimestampBuilder(memory.DefaultAllocator, &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "UTC"})
144-
syncTimeBldr.Reserve(nRows)
145-
for i := 0; i < nRows; i++ {
146-
syncTimeBldr.Append(ts)
141+
if t.withSyncTime && (!sc.HasField(cqSyncTime) || env.IsCloud()) {
142+
syncTimeArray, _ := schema.TimestampArrayFromTime(t.syncTime, arrow.Microsecond, "UTC", nRows)
143+
if !sc.HasField(cqSyncTime) {
144+
cols = append(cols, syncTimeArray)
145+
} else {
146+
record, _ = schema.ReplaceFieldInRecord(record, cqSyncTime, syncTimeArray)
147147
}
148-
cols = append(cols, syncTimeBldr.NewArray())
149148
}
150-
if t.withSourceName && !sc.HasField(cqSourceName) {
151-
sourceBldr := array.NewStringBuilder(memory.DefaultAllocator)
152-
sourceBldr.Reserve(nRows)
153-
for i := 0; i < nRows; i++ {
154-
sourceBldr.Append(t.sourceName)
149+
if t.withSourceName && (!sc.HasField(cqSourceName) || env.IsCloud()) {
150+
sourceNameArray := schema.StringArrayFromValue(t.sourceName, nRows)
151+
if !sc.HasField(cqSourceName) {
152+
cols = append(cols, sourceNameArray)
153+
} else {
154+
record, _ = schema.ReplaceFieldInRecord(record, cqSourceName, sourceNameArray)
155155
}
156-
cols = append(cols, sourceBldr.NewArray())
157156
}
158-
if t.withSyncGroupID && !sc.HasField(cqSyncGroupId) {
159-
syncGroupIdBldr := array.NewStringBuilder(memory.DefaultAllocator)
160-
syncGroupIdBldr.Reserve(nRows)
161-
for i := 0; i < nRows; i++ {
162-
syncGroupIdBldr.Append(t.syncGroupId)
157+
if t.withSyncGroupID && (!sc.HasField(cqSyncGroupId) || env.IsCloud()) {
158+
syncGroupIdArray := schema.StringArrayFromValue(t.syncGroupId, nRows)
159+
if !sc.HasField(cqSyncGroupId) {
160+
cols = append(cols, syncGroupIdArray)
161+
} else {
162+
record, _ = schema.ReplaceFieldInRecord(record, cqSyncGroupId, syncGroupIdArray)
163163
}
164-
cols = append(cols, syncGroupIdBldr.NewArray())
165164
}
166165

167166
cols = append(cols, record.Columns()...)

0 commit comments

Comments
 (0)