Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 42 additions & 22 deletions cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
syncTimeTook time.Duration
totalResources = int64(0)
totals = sourceClient.Metrics()
statsPerTable = cloudquery_api.SyncRunTableProgress{}
)
defer func() {
analytics.TrackSyncCompleted(ctx, invocationUUID.UUID, analytics.SyncFinishedEvent{
Expand Down Expand Up @@ -316,6 +317,7 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
SkipTables: sourceSpec.SkipTables,
SkipDependentTables: *sourceSpec.SkipDependentTables,
DeterministicCqId: sourceSpec.DeterministicCQID,
WithErrorMessages: true,
}
if sourceSpec.BackendOptions != nil {
syncReq.Backend = &plugin.Sync_BackendOptions{
Expand Down Expand Up @@ -351,6 +353,22 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
// Read from the sync stream and write to all destinations.
isComplete := int64(0)

sourceTables, err := getTables(ctx, sourcePbClient, &plugin.GetTables_Request{
Tables: sourceSpec.Tables,
SkipTables: sourceSpec.SkipTables,
SkipDependentTables: *sourceSpec.SkipDependentTables})

if err != nil {
return err
}
// Pre init stats per table
for _, table := range sourceTables {
statsPerTable[table.Name] = cloudquery_api.SyncRunTableProgressValue{
Rows: 0,
Errors: 0,
}
}

var remoteProgressReporter *godebouncer.Debouncer
if progressAPIClient != nil {
teamName, syncName, syncRunId := os.Getenv("_CQ_TEAM_NAME"), os.Getenv("_CQ_SYNC_NAME"), os.Getenv("_CQ_SYNC_RUN_ID")
Expand All @@ -373,10 +391,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
status = cloudquery_api.SyncRunStatusCompleted
}
obj := cloudquery_api.CreateSyncRunProgressJSONRequestBody{
Rows: atomic.LoadInt64(&totalResources),
Errors: int64(totals.Errors),
Warnings: int64(totals.Warnings),
Status: &status,
Rows: atomic.LoadInt64(&totalResources),
Errors: int64(totals.Errors),
Warnings: int64(totals.Warnings),
Status: &status,
TableProgress: &statsPerTable,
}
if shard != nil {
obj.ShardNum = &shard.num
Expand Down Expand Up @@ -487,6 +506,10 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e

atomic.AddInt64(&newResources, record.NumRows())
atomic.AddInt64(&totalResources, record.NumRows())
tableName, _ := record.Schema().Metadata().GetValue(schema.MetadataTableName)
stats := statsPerTable[tableName]
stats.Rows += record.NumRows()
statsPerTable[tableName] = stats
if remoteProgressReporter != nil {
remoteProgressReporter.SendSignal()
}
Expand Down Expand Up @@ -598,6 +621,11 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
return handleSendError(err, writeClients[i], "migrate")
}
}
case *plugin.Sync_Response_Error:
log.Error().Str("table", m.Error.TableName).Msg(m.Error.Error)
stats := statsPerTable[m.Error.TableName]
stats.Errors++
statsPerTable[m.Error.TableName] = stats
default:
return fmt.Errorf("unknown message type: %T", m)
}
Expand All @@ -608,27 +636,19 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
return err
}

syncSummaryEnabled := summaryLocation != "" || lo.SomeBy(destinationSpecs, func(d specs.Destination) bool { return d.SyncSummary })
var sourceTables schema.Tables
if syncSummaryEnabled {
sourceTables, err = getTables(ctx, sourcePbClient, &plugin.GetTables_Request{
Tables: sourceSpec.Tables,
SkipTables: sourceSpec.SkipTables,
SkipDependentTables: *sourceSpec.SkipDependentTables,
})

if err != nil {
return err
}
}

err = syncClient.CloseSend()
if err != nil {
return err
}
totals = sourceClient.Metrics()
sourceWarnings := totals.Warnings
sourceErrors := totals.Errors
var sourceErrors uint64
for _, val := range statsPerTable {
sourceErrors += uint64(val.Errors)
}
if totals.Errors > sourceErrors {
sourceErrors = totals.Errors
}
var metadataDataErrors error
for i := range destinationsClients {
m := destinationsClients[i].Metrics()
Expand Down Expand Up @@ -697,13 +717,13 @@ func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr e
exitReason = ExitReasonCompleted

msg := "Sync completed successfully"
if totals.Errors > 0 {
if sourceErrors > 0 {
msg = "Sync completed with errors, see logs for details"
}
fmt.Printf("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n", msg, totalResources, totals.Errors, totals.Warnings, syncTimeTook.Truncate(time.Second).String())
fmt.Printf("%s. Resources: %d, Errors: %d, Warnings: %d, Time: %s\n", msg, totalResources, sourceErrors, totals.Warnings, syncTimeTook.Truncate(time.Second).String())
log.Info().
Int64("resources", totalResources).
Uint64("errors", totals.Errors).
Uint64("errors", sourceErrors).
Uint64("warnings", totals.Warnings).
Str("duration", syncTimeTook.Truncate(time.Second).String()).
Str("result", msg).
Expand Down