@@ -149,6 +149,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
149149 syncTimeTook time.Duration
150150 totalResources = int64 (0 )
151151 totals = sourceClient .Metrics ()
152+ statsPerTable = cloudquery_api.SyncRunTableProgress {}
152153 )
153154 defer func () {
154155 analytics .TrackSyncCompleted (ctx , invocationUUID .UUID , analytics.SyncFinishedEvent {
@@ -316,6 +317,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
316317 SkipTables : sourceSpec .SkipTables ,
317318 SkipDependentTables : * sourceSpec .SkipDependentTables ,
318319 DeterministicCqId : sourceSpec .DeterministicCQID ,
320+ WithErrorMessages : true ,
319321 }
320322 if sourceSpec .BackendOptions != nil {
321323 syncReq .Backend = & plugin.Sync_BackendOptions {
@@ -351,6 +353,22 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
351353 // Read from the sync stream and write to all destinations.
352354 isComplete := int64 (0 )
353355
356+ sourceTables , err := getTables (ctx , sourcePbClient , & plugin.GetTables_Request {
357+ Tables : sourceSpec .Tables ,
358+ SkipTables : sourceSpec .SkipTables ,
359+ SkipDependentTables : * sourceSpec .SkipDependentTables })
360+
361+ if err != nil {
362+ return err
363+ }
364+ // Pre init stats per table
365+ for _ , table := range sourceTables {
366+ statsPerTable [table .Name ] = cloudquery_api.SyncRunTableProgressValue {
367+ Rows : 0 ,
368+ Errors : 0 ,
369+ }
370+ }
371+
354372 var remoteProgressReporter * godebouncer.Debouncer
355373 if progressAPIClient != nil {
356374 teamName , syncName , syncRunId := os .Getenv ("_CQ_TEAM_NAME" ), os .Getenv ("_CQ_SYNC_NAME" ), os .Getenv ("_CQ_SYNC_RUN_ID" )
@@ -373,10 +391,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
373391 status = cloudquery_api .SyncRunStatusCompleted
374392 }
375393 obj := cloudquery_api.CreateSyncRunProgressJSONRequestBody {
376- Rows : atomic .LoadInt64 (& totalResources ),
377- Errors : int64 (totals .Errors ),
378- Warnings : int64 (totals .Warnings ),
379- Status : & status ,
394+ Rows : atomic .LoadInt64 (& totalResources ),
395+ Errors : int64 (totals .Errors ),
396+ Warnings : int64 (totals .Warnings ),
397+ Status : & status ,
398+ TableProgress : & statsPerTable ,
380399 }
381400 if shard != nil {
382401 obj .ShardNum = & shard .num
@@ -487,6 +506,10 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
487506
488507 atomic .AddInt64 (& newResources , record .NumRows ())
489508 atomic .AddInt64 (& totalResources , record .NumRows ())
509+ tableName , _ := record .Schema ().Metadata ().GetValue (schema .MetadataTableName )
510+ stats := statsPerTable [tableName ]
511+ stats .Rows += record .NumRows ()
512+ statsPerTable [tableName ] = stats
490513 if remoteProgressReporter != nil {
491514 remoteProgressReporter .SendSignal ()
492515 }
@@ -598,6 +621,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
598621 return handleSendError (err , writeClients [i ], "migrate" )
599622 }
600623 }
624+ case * plugin.Sync_Response_Error :
625+ log .Error ().Str ("table" , m .Error .TableName ).Msg (m .Error .Error )
626+ stats := statsPerTable [m .Error .TableName ]
627+ stats .Errors ++
628+ statsPerTable [m .Error .TableName ] = stats
601629 default :
602630 return fmt .Errorf ("unknown message type: %T" , m )
603631 }
@@ -608,27 +636,19 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
608636 return err
609637 }
610638
611- syncSummaryEnabled := summaryLocation != "" || lo .SomeBy (destinationSpecs , func (d specs.Destination ) bool { return d .SyncSummary })
612- var sourceTables schema.Tables
613- if syncSummaryEnabled {
614- sourceTables , err = getTables (ctx , sourcePbClient , & plugin.GetTables_Request {
615- Tables : sourceSpec .Tables ,
616- SkipTables : sourceSpec .SkipTables ,
617- SkipDependentTables : * sourceSpec .SkipDependentTables ,
618- })
619-
620- if err != nil {
621- return err
622- }
623- }
624-
625639 err = syncClient .CloseSend ()
626640 if err != nil {
627641 return err
628642 }
629643 totals = sourceClient .Metrics ()
630644 sourceWarnings := totals .Warnings
631- sourceErrors := totals .Errors
645+ var sourceErrors uint64
646+ for _ , val := range statsPerTable {
647+ sourceErrors += uint64 (val .Errors )
648+ }
649+ if totals .Errors > sourceErrors {
650+ sourceErrors = totals .Errors
651+ }
632652 var metadataDataErrors error
633653 for i := range destinationsClients {
634654 m := destinationsClients [i ].Metrics ()
@@ -697,13 +717,13 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
697717 exitReason = ExitReasonCompleted
698718
699719 msg := "Sync completed successfully"
700- if totals . Errors > 0 {
720+ if sourceErrors > 0 {
701721 msg = "Sync completed with errors, see logs for details"
702722 }
703- fmt .Printf ("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n " , msg , totalResources , totals . Errors , totals .Warnings , syncTimeTook .Truncate (time .Second ).String ())
723+ fmt .Printf ("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n " , msg , totalResources , sourceErrors , totals .Warnings , syncTimeTook .Truncate (time .Second ).String ())
704724 log .Info ().
705725 Int64 ("resources" , totalResources ).
706- Uint64 ("errors" , totals . Errors ).
726+ Uint64 ("errors" , sourceErrors ).
707727 Uint64 ("warnings" , totals .Warnings ).
708728 Str ("duration" , syncTimeTook .Truncate (time .Second ).String ()).
709729 Str ("result" , msg ).
0 commit comments