Skip to content

Commit 9510f5e

Browse files
authored
fix: Don't call Tables gRPC method at the beginning of the sync (#21180)
#### Summary Since this PR https://github.com/cloudquery/cloudquery/pull/20909/files the S3 source plugin fails to sync and result in the following error `failed to get tables: tables only discovered during sync`. The reason is that the S3 source dynamically discovers all files/tables during the sync, and errors out if you call the tables endpoint before the sync is done, see https://github.com/cloudquery/cloudquery-private/blob/334760529da4815dc83659c7f529154682b2c8a0/plugins/source/s3/resources/plugin/client.go#L91 Instead of calling the tables endpoint we can keep track of the tables during the sync. This works even if you pass `--no-migrate` since we skip those a bit after the new code https://github.com/cloudquery/cloudquery/blob/7b5980236005db31be536f4ffd2c390752f12309/cli/cmd/sync_v3.go#L600
1 parent 7b59802 commit 9510f5e

2 files changed

Lines changed: 19 additions & 29 deletions

File tree

cli/cmd/sync_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestSync(t *testing.T) {
5050
Resources: 13,
5151
SourceName: "test",
5252
SourcePath: "cloudquery/test",
53-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
53+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
5454
ResourcesPerTable: map[string]uint64{
5555
"test_some_table": 1,
5656
"test_sub_table": 10,
@@ -72,7 +72,7 @@ func TestSync(t *testing.T) {
7272
Resources: 13,
7373
SourceName: "test2",
7474
SourcePath: "cloudquery/test",
75-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
75+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
7676
ResourcesPerTable: map[string]uint64{
7777
"test_some_table": 1,
7878
"test_sub_table": 10,
@@ -107,7 +107,7 @@ func TestSync(t *testing.T) {
107107
Resources: 13,
108108
SourceName: "test-1",
109109
SourcePath: "cloudquery/test",
110-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
110+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
111111
ResourcesPerTable: map[string]uint64{
112112
"test_some_table": 1,
113113
"test_sub_table": 10,
@@ -128,7 +128,7 @@ func TestSync(t *testing.T) {
128128
Resources: 13,
129129
SourceName: "test-2",
130130
SourcePath: "cloudquery/test",
131-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
131+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
132132
ResourcesPerTable: map[string]uint64{
133133
"test_some_table": 1,
134134
"test_sub_table": 10,
@@ -155,7 +155,7 @@ func TestSync(t *testing.T) {
155155
Resources: 13,
156156
SourceName: "test",
157157
SourcePath: "cloudquery/test",
158-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
158+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
159159
ResourcesPerTable: map[string]uint64{
160160
"test_some_table": 1,
161161
"test_sub_table": 10,
@@ -182,7 +182,7 @@ func TestSync(t *testing.T) {
182182
Resources: 13,
183183
SourceName: "test",
184184
SourcePath: "cloudquery/test",
185-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
185+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
186186
SyncGroupID: lo.ToPtr("sync_group_id_test"),
187187
ResourcesPerTable: map[string]uint64{
188188
"test_some_table": 1,
@@ -212,7 +212,7 @@ func TestSync(t *testing.T) {
212212
Resources: 11,
213213
SourceName: "test",
214214
SourcePath: "cloudquery/test",
215-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
215+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
216216
SyncGroupID: lo.ToPtr("sync_group_id_test"),
217217
ShardNum: lo.ToPtr(1),
218218
ShardTotal: lo.ToPtr(2),
@@ -424,7 +424,7 @@ func TestSyncWithSummaryTable(t *testing.T) {
424424
SourceName: "test",
425425
SourcePath: "cloudquery/test",
426426
SourceVersion: "v4.5.1",
427-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
427+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
428428
},
429429
},
430430
},
@@ -444,7 +444,7 @@ func TestSyncWithSummaryTable(t *testing.T) {
444444
SourceName: "test_1_2",
445445
SourcePath: "cloudquery/test",
446446
SourceVersion: "v4.5.1",
447-
SourceTables: []string{"test_some_table", "test_sub_table", "test_testdata_table", "test_paid_table"},
447+
SourceTables: []string{"test_paid_table", "test_some_table", "test_sub_table", "test_testdata_table"},
448448
SyncGroupID: lo.ToPtr("sync_group_id_test"),
449449
ShardNum: lo.ToPtr(1),
450450
ShardTotal: lo.ToPtr(2),

cli/cmd/sync_v3.go

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"io"
99
"net/http"
1010
"os"
11+
"slices"
1112
"strconv"
1213
gosync "sync"
1314
"sync/atomic"
@@ -356,23 +357,6 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
356357
// Read from the sync stream and write to all destinations.
357358
isComplete := int64(0)
358359

359-
sourceTables, err := getTables(ctx, sourcePbClient, &plugin.GetTables_Request{
360-
Tables: sourceSpec.Tables,
361-
SkipTables: sourceSpec.SkipTables,
362-
SkipDependentTables: *sourceSpec.SkipDependentTables})
363-
364-
if err != nil {
365-
return err
366-
}
367-
// Pre init stats per table
368-
for _, table := range sourceTables {
369-
initialStats := cloudquery_api.SyncRunTableProgressValue{
370-
Rows: 0,
371-
Errors: 0,
372-
}
373-
statsPerTable.Add(table.Name, initialStats)
374-
}
375-
376360
var remoteProgressReporter *godebouncer.Debouncer
377361
if progressAPIClient != nil {
378362
teamName, syncName, syncRunId := os.Getenv("_CQ_TEAM_NAME"), os.Getenv("_CQ_SYNC_NAME"), os.Getenv("_CQ_SYNC_RUN_ID")
@@ -491,6 +475,8 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
491475
}
492476
}()
493477

478+
sourceTables := map[string]bool{}
479+
494480
eg.Go(func() error {
495481
// Close all transformation pipelines when the source is done
496482
defer func() {
@@ -518,6 +504,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
518504
atomic.AddInt64(&newResources, record.NumRows())
519505
atomic.AddInt64(&totalResources, record.NumRows())
520506
tableName, _ := record.Schema().Metadata().GetValue(schema.MetadataTableName)
507+
sourceTables[tableName] = true
521508
stats, _ := statsPerTable.Get(tableName)
522509
stats.Rows += record.NumRows()
523510
statsPerTable.Add(tableName, stats)
@@ -591,6 +578,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
591578
return err
592579
}
593580

581+
sourceTables[table.Name] = true
594582
// This works since we sync and send migrate messages for parents before children
595583
if isStateBackendEnabled && (table.IsIncremental || (table.Parent != nil && skippedFromDeleteStale[table.Parent.Name])) {
596584
skippedFromDeleteStale[table.Name] = true
@@ -664,6 +652,8 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
664652

665653
tableProgress := statsPerTable.GetAll()
666654
for i := range destinationsClients {
655+
sourceTables := lo.Keys(sourceTables)
656+
slices.Sort(sourceTables)
667657
m := destinationsClients[i].Metrics()
668658
summary := syncSummary{
669659
Resources: uint64(totalResources),
@@ -674,18 +664,18 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
674664
SourceName: sourceSpec.Name,
675665
SourceVersion: sourceSpec.Version,
676666
SourcePath: sourceSpec.Path,
677-
SourceTables: tableNameChanger.UpdateTableNamesSlice(destinationSpecs[i].Name, sourceTables.TableNames()),
667+
SourceTables: tableNameChanger.UpdateTableNamesSlice(destinationSpecs[i].Name, sourceTables),
678668
CLIVersion: Version,
679669
DestinationErrors: m.Errors,
680670
DestinationWarnings: m.Warnings,
681671
DestinationName: destinationSpecs[i].Name,
682672
DestinationVersion: destinationSpecs[i].Version,
683673
DestinationPath: destinationSpecs[i].Path,
684-
ResourcesPerTable: lo.Reduce(lo.Keys(tableProgress), func(acc map[string]uint64, tableName string, _ int) map[string]uint64 {
674+
ResourcesPerTable: lo.Reduce(sourceTables, func(acc map[string]uint64, tableName string, _ int) map[string]uint64 {
685675
acc[tableName] = uint64(tableProgress[tableName].Rows)
686676
return acc
687677
}, map[string]uint64{}),
688-
ErrorsPerTable: lo.Reduce(lo.Keys(tableProgress), func(acc map[string]uint64, tableName string, _ int) map[string]uint64 {
678+
ErrorsPerTable: lo.Reduce(sourceTables, func(acc map[string]uint64, tableName string, _ int) map[string]uint64 {
689679
acc[tableName] = uint64(tableProgress[tableName].Errors)
690680
return acc
691681
}, map[string]uint64{}),

0 commit comments

Comments
 (0)