Skip to content

Commit a2d9473

Browse files
author
Przemysław Stępień
authored
feat: Add error handling and per-table metrics (#20909)
#### Summary ⚠️ **If you're contributing to a plugin please read this section of the [contribution guidelines](https://github.com/cloudquery/cloudquery/blob/main/CONTRIBUTING.md#open-core-vs-open-source) 🧑‍🎓 before submitting this PR** ⚠️ This PR adds handling of error messages from source plugins and add new statistics for platform calls. This solves two things: 1. Having proper error statistics when using plugins in development mode 2. Adding ability to show detailed per table metrics in platform without need to rely on otel_traces
1 parent bec2a89 commit a2d9473

File tree

1 file changed

+42
-22
lines changed

1 file changed

+42
-22
lines changed

cli/cmd/sync_v3.go

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
149149
syncTimeTook time.Duration
150150
totalResources = int64(0)
151151
totals = sourceClient.Metrics()
152+
statsPerTable = cloudquery_api.SyncRunTableProgress{}
152153
)
153154
defer func() {
154155
analytics.TrackSyncCompleted(ctx, invocationUUID.UUID, analytics.SyncFinishedEvent{
@@ -316,6 +317,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
316317
SkipTables: sourceSpec.SkipTables,
317318
SkipDependentTables: *sourceSpec.SkipDependentTables,
318319
DeterministicCqId: sourceSpec.DeterministicCQID,
320+
WithErrorMessages: true,
319321
}
320322
if sourceSpec.BackendOptions != nil {
321323
syncReq.Backend = &plugin.Sync_BackendOptions{
@@ -351,6 +353,22 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
351353
// Read from the sync stream and write to all destinations.
352354
isComplete := int64(0)
353355

356+
sourceTables, err := getTables(ctx, sourcePbClient, &plugin.GetTables_Request{
357+
Tables: sourceSpec.Tables,
358+
SkipTables: sourceSpec.SkipTables,
359+
SkipDependentTables: *sourceSpec.SkipDependentTables})
360+
361+
if err != nil {
362+
return err
363+
}
364+
// Pre init stats per table
365+
for _, table := range sourceTables {
366+
statsPerTable[table.Name] = cloudquery_api.SyncRunTableProgressValue{
367+
Rows: 0,
368+
Errors: 0,
369+
}
370+
}
371+
354372
var remoteProgressReporter *godebouncer.Debouncer
355373
if progressAPIClient != nil {
356374
teamName, syncName, syncRunId := os.Getenv("_CQ_TEAM_NAME"), os.Getenv("_CQ_SYNC_NAME"), os.Getenv("_CQ_SYNC_RUN_ID")
@@ -373,10 +391,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
373391
status = cloudquery_api.SyncRunStatusCompleted
374392
}
375393
obj := cloudquery_api.CreateSyncRunProgressJSONRequestBody{
376-
Rows: atomic.LoadInt64(&totalResources),
377-
Errors: int64(totals.Errors),
378-
Warnings: int64(totals.Warnings),
379-
Status: &status,
394+
Rows: atomic.LoadInt64(&totalResources),
395+
Errors: int64(totals.Errors),
396+
Warnings: int64(totals.Warnings),
397+
Status: &status,
398+
TableProgress: &statsPerTable,
380399
}
381400
if shard != nil {
382401
obj.ShardNum = &shard.num
@@ -487,6 +506,10 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
487506

488507
atomic.AddInt64(&newResources, record.NumRows())
489508
atomic.AddInt64(&totalResources, record.NumRows())
509+
tableName, _ := record.Schema().Metadata().GetValue(schema.MetadataTableName)
510+
stats := statsPerTable[tableName]
511+
stats.Rows += record.NumRows()
512+
statsPerTable[tableName] = stats
490513
if remoteProgressReporter != nil {
491514
remoteProgressReporter.SendSignal()
492515
}
@@ -598,6 +621,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
598621
return handleSendError(err, writeClients[i], "migrate")
599622
}
600623
}
624+
case *plugin.Sync_Response_Error:
625+
log.Error().Str("table", m.Error.TableName).Msg(m.Error.Error)
626+
stats := statsPerTable[m.Error.TableName]
627+
stats.Errors++
628+
statsPerTable[m.Error.TableName] = stats
601629
default:
602630
return fmt.Errorf("unknown message type: %T", m)
603631
}
@@ -608,27 +636,19 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
608636
return err
609637
}
610638

611-
syncSummaryEnabled := summaryLocation != "" || lo.SomeBy(destinationSpecs, func(d specs.Destination) bool { return d.SyncSummary })
612-
var sourceTables schema.Tables
613-
if syncSummaryEnabled {
614-
sourceTables, err = getTables(ctx, sourcePbClient, &plugin.GetTables_Request{
615-
Tables: sourceSpec.Tables,
616-
SkipTables: sourceSpec.SkipTables,
617-
SkipDependentTables: *sourceSpec.SkipDependentTables,
618-
})
619-
620-
if err != nil {
621-
return err
622-
}
623-
}
624-
625639
err = syncClient.CloseSend()
626640
if err != nil {
627641
return err
628642
}
629643
totals = sourceClient.Metrics()
630644
sourceWarnings := totals.Warnings
631-
sourceErrors := totals.Errors
645+
var sourceErrors uint64
646+
for _, val := range statsPerTable {
647+
sourceErrors += uint64(val.Errors)
648+
}
649+
if totals.Errors > sourceErrors {
650+
sourceErrors = totals.Errors
651+
}
632652
var metadataDataErrors error
633653
for i := range destinationsClients {
634654
m := destinationsClients[i].Metrics()
@@ -697,13 +717,13 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
697717
exitReason = ExitReasonCompleted
698718

699719
msg := "Sync completed successfully"
700-
if totals.Errors > 0 {
720+
if sourceErrors > 0 {
701721
msg = "Sync completed with errors, see logs for details"
702722
}
703-
fmt.Printf("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n", msg, totalResources, totals.Errors, totals.Warnings, syncTimeTook.Truncate(time.Second).String())
723+
fmt.Printf("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n", msg, totalResources, sourceErrors, totals.Warnings, syncTimeTook.Truncate(time.Second).String())
704724
log.Info().
705725
Int64("resources", totalResources).
706-
Uint64("errors", totals.Errors).
726+
Uint64("errors", sourceErrors).
707727
Uint64("warnings", totals.Warnings).
708728
Str("duration", syncTimeTook.Truncate(time.Second).String()).
709729
Str("result", msg).

0 commit comments

Comments
 (0)