Skip to content

Commit 167cafa

Browse files
authored
chore: refactor async destination job metadata capturing (#6528)
🔒 Scanned for secrets using gitleaks 8.28.0 # Description Refactoring the handling of job parameters and related metadata in the async destination manager. Streamlining the way job metadata is managed and passed throughout the codebase, improving performance (less iterations), maintainability and clarity. Additionally, capturing job metadata for partition id, which is going to be required for non-blocking workspace partition migrations. ## Linear Ticket Resolves PIPE-2601 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 418c453 commit 167cafa

File tree

6 files changed

+126
-121
lines changed

6 files changed

+126
-121
lines changed

router/batchrouter/asyncdestinationmanager/common/common.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -99,24 +99,28 @@ type ImportParameters struct {
9999
}
100100

101101
type AsyncDestinationStruct struct {
102-
ImportingJobIDs []int64
103-
FailedJobIDs []int64
104-
Exists bool
105-
Size int
106-
CreatedAt time.Time
107-
FileName string
108-
Count int
109-
CanUpload bool
110-
UploadInProgress bool
111-
UploadMutex sync.RWMutex
112-
DestinationUploadURL string
113-
Destination *backendconfig.DestinationT
114-
Manager AsyncDestinationManager
115-
AttemptNums map[int64]int
116-
FirstAttemptedAts map[int64]time.Time
117-
OriginalJobParameters map[int64]stdjson.RawMessage
118-
PartFileNumber int
119-
SourceJobRunID string
102+
ImportingJobIDs []int64
103+
FailedJobIDs []int64
104+
Exists bool
105+
Size int
106+
CreatedAt time.Time
107+
FileName string
108+
Count int
109+
CanUpload bool
110+
UploadInProgress bool
111+
UploadMutex sync.RWMutex
112+
DestinationUploadURL string
113+
Destination *backendconfig.DestinationT
114+
Manager AsyncDestinationManager
115+
PartFileNumber int
116+
SourceJobRunID string
117+
118+
// Maps jobID to various metadata
119+
120+
AttemptNums map[int64]int
121+
FirstAttemptedAts map[int64]time.Time
122+
PartitionIDs map[int64]string
123+
JobParameters map[int64]stdjson.RawMessage
120124
}
121125

122126
type GetUploadStatsInput struct {

router/batchrouter/asyncdestinationmanager/sftp/sftp_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,14 @@ func TestSFTP(t *testing.T) {
174174
payload[id] = rawMessage
175175

176176
asyncDestination := common.AsyncDestinationStruct{
177-
ImportingJobIDs: []int64{1014, 1015, 1016, 1017},
178-
FileName: filePath,
179-
Destination: &destinations[0],
180-
Manager: manager,
181-
OriginalJobParameters: payload,
182-
CreatedAt: now,
183-
PartFileNumber: 1,
184-
SourceJobRunID: "someJobRunId_1",
177+
ImportingJobIDs: []int64{1014, 1015, 1016, 1017},
178+
FileName: filePath,
179+
Destination: &destinations[0],
180+
Manager: manager,
181+
JobParameters: payload,
182+
CreatedAt: now,
183+
PartFileNumber: 1,
184+
SourceJobRunID: "someJobRunId_1",
185185
}
186186
expected := common.AsyncUploadOutput{
187187
DestinationID: "destination_id_1",

router/batchrouter/handle_async.go

Lines changed: 49 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,9 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
360360
uploadResponse := brt.asyncDestinationStruct[destinationID].Manager.Upload(brt.asyncDestinationStruct[destinationID])
361361

362362
brt.setMultipleJobStatus(setMultipleJobStatusParams{
363-
AsyncOutput: uploadResponse,
364-
Attempted: true,
365-
AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums,
366-
FirstAttemptedAts: brt.asyncDestinationStruct[destinationID].FirstAttemptedAts,
367-
OriginalJobParameters: brt.asyncDestinationStruct[destinationID].OriginalJobParameters,
363+
asyncJobMetadata: newAsyncJobMetadataFromDestinationStruct(brt.asyncDestinationStruct[destinationID]),
364+
AsyncOutput: uploadResponse,
365+
Attempted: true,
368366
})
369367
if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 {
370368
brt.asyncDestinationStruct[destinationID].UploadInProgress = true
@@ -378,7 +376,7 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
378376
}
379377
}
380378

381-
func (brt *Handle) asyncStructSetup(sourceID, destinationID string, attemptNums map[int64]int, firstAttemptedAts map[int64]time.Time, originalJobParameters map[int64]stdjson.RawMessage) {
379+
func (brt *Handle) asyncStructSetup(sourceID, destinationID string, jobsList []*jobsdb.JobT) {
382380
localTmpDirName := fmt.Sprintf(`/%s/`, misc.RudderAsyncDestinationLogs)
383381
uuid := uuid.New()
384382

@@ -394,15 +392,16 @@ func (brt *Handle) asyncStructSetup(sourceID, destinationID string, attemptNums
394392
}
395393

396394
existingJobRunID := brt.asyncDestinationStruct[destinationID].SourceJobRunID
397-
newJobRunID := getFirstSourceJobRunID(originalJobParameters)
395+
asyncJobMetadata := newAsyncJobMetadata(jobsList)
396+
newJobRunID := getFirstSourceJobRunID(asyncJobMetadata.JobParameters)
398397
if newJobRunID != existingJobRunID {
399398
brt.asyncDestinationStruct[destinationID].PartFileNumber = 0
400399
}
401-
402400
brt.asyncDestinationStruct[destinationID].Exists = true
403-
brt.asyncDestinationStruct[destinationID].AttemptNums = attemptNums
404-
brt.asyncDestinationStruct[destinationID].FirstAttemptedAts = firstAttemptedAts
405-
brt.asyncDestinationStruct[destinationID].OriginalJobParameters = originalJobParameters
401+
brt.asyncDestinationStruct[destinationID].AttemptNums = asyncJobMetadata.AttemptNums
402+
brt.asyncDestinationStruct[destinationID].FirstAttemptedAts = asyncJobMetadata.FirstAttemptedAts
403+
brt.asyncDestinationStruct[destinationID].JobParameters = asyncJobMetadata.JobParameters
404+
brt.asyncDestinationStruct[destinationID].PartitionIDs = asyncJobMetadata.PartitionIDs
406405
brt.asyncDestinationStruct[destinationID].FileName = jsonPath
407406
brt.asyncDestinationStruct[destinationID].CreatedAt = brt.now()
408407
brt.asyncDestinationStruct[destinationID].SourceJobRunID = newJobRunID
@@ -418,33 +417,11 @@ func (brt *Handle) asyncStructCleanUp(destinationID string) {
418417
brt.asyncDestinationStruct[destinationID].Count = 0
419418
brt.asyncDestinationStruct[destinationID].CanUpload = false
420419
brt.asyncDestinationStruct[destinationID].DestinationUploadURL = ""
420+
421421
brt.asyncDestinationStruct[destinationID].AttemptNums = make(map[int64]int)
422422
brt.asyncDestinationStruct[destinationID].FirstAttemptedAts = make(map[int64]time.Time)
423-
brt.asyncDestinationStruct[destinationID].OriginalJobParameters = make(map[int64]stdjson.RawMessage)
424-
}
425-
426-
func getAttemptNumbers(jobs []*jobsdb.JobT) map[int64]int {
427-
attemptNums := make(map[int64]int)
428-
for _, job := range jobs {
429-
attemptNums[job.JobID] = job.LastJobStatus.AttemptNum
430-
}
431-
return attemptNums
432-
}
433-
434-
func getFirstAttemptAts(jobs []*jobsdb.JobT) map[int64]time.Time {
435-
firstAttemptedAts := make(map[int64]time.Time)
436-
for _, job := range jobs {
437-
firstAttemptedAts[job.JobID] = getFirstAttemptAtFromErrorResponse(job.LastJobStatus.ErrorResponse)
438-
}
439-
return firstAttemptedAts
440-
}
441-
442-
func getOriginalJobParameters(jobs []*jobsdb.JobT) map[int64]stdjson.RawMessage {
443-
originalJobParameters := make(map[int64]stdjson.RawMessage)
444-
for _, job := range jobs {
445-
originalJobParameters[job.JobID] = job.Parameters
446-
}
447-
return originalJobParameters
423+
brt.asyncDestinationStruct[destinationID].JobParameters = make(map[int64]stdjson.RawMessage)
424+
brt.asyncDestinationStruct[destinationID].PartitionIDs = make(map[int64]string)
448425
}
449426

450427
func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
@@ -459,11 +436,9 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
459436
}
460437

461438
brt.setMultipleJobStatus(setMultipleJobStatusParams{
462-
AsyncOutput: out,
463-
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
464-
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
465-
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
466-
JobsList: batchJobs.Jobs,
439+
asyncJobMetadata: newAsyncJobMetadata(batchJobs.Jobs),
440+
AsyncOutput: out,
441+
JobsList: batchJobs.Jobs,
467442
})
468443
return nil
469444
}
@@ -492,25 +467,24 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
492467
}
493468

494469
brt.setMultipleJobStatus(setMultipleJobStatusParams{
495-
AsyncOutput: out,
496-
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
497-
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
498-
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
499-
JobsList: batchJobs.Jobs,
470+
asyncJobMetadata: newAsyncJobMetadata(batchJobs.Jobs),
471+
AsyncOutput: out,
472+
JobsList: batchJobs.Jobs,
500473
})
501474
return nil
502475
}
503476
}
504477

478+
var skipUpdateStructJobMetadataMaps bool
505479
if !ok || !brt.asyncDestinationStruct[destinationID].Exists {
506480
if !ok {
507481
asyncStruct := &common.AsyncDestinationStruct{}
508482
asyncStruct.UploadMutex.Lock()
509483
defer asyncStruct.UploadMutex.Unlock()
510484
brt.asyncDestinationStruct[destinationID] = asyncStruct
511485
}
512-
513-
brt.asyncStructSetup(batchJobs.Connection.Source.ID, destinationID, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs))
486+
brt.asyncStructSetup(batchJobs.Connection.Source.ID, destinationID, batchJobs.Jobs)
487+
skipUpdateStructJobMetadataMaps = true
514488
}
515489

516490
file, err := os.OpenFile(brt.asyncDestinationStruct[destinationID].FileName, os.O_CREATE|os.O_WRONLY, 0o600)
@@ -565,28 +539,22 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
565539

566540
brt.setMultipleJobStatus(
567541
setMultipleJobStatusParams{
568-
AsyncOutput: out,
569-
AttemptNums: getAttemptNumbers(batchJobs.Jobs),
570-
FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs),
571-
OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs),
572-
JobsList: batchJobs.Jobs,
542+
asyncJobMetadata: newAsyncJobMetadata(batchJobs.Jobs),
543+
AsyncOutput: out,
544+
JobsList: batchJobs.Jobs,
573545
},
574546
)
575547

576548
return nil
577549
}
578550

579-
newAttemptNums := getAttemptNumbers(batchJobs.Jobs)
580-
for jobID, attemptNum := range newAttemptNums {
581-
brt.asyncDestinationStruct[destinationID].AttemptNums[jobID] = attemptNum
582-
}
583-
newFirstAttemptedAts := getFirstAttemptAts(batchJobs.Jobs)
584-
for jobID, firstAttemptedAt := range newFirstAttemptedAts {
585-
brt.asyncDestinationStruct[destinationID].FirstAttemptedAts[jobID] = firstAttemptedAt
586-
}
587-
newOriginalJobParameters := getOriginalJobParameters(batchJobs.Jobs)
588-
for jobID, originalJobParameter := range newOriginalJobParameters {
589-
brt.asyncDestinationStruct[destinationID].OriginalJobParameters[jobID] = originalJobParameter
551+
if !skipUpdateStructJobMetadataMaps { // update job metadata maps of asyncDestinationStruct
552+
for _, job := range batchJobs.Jobs {
553+
brt.asyncDestinationStruct[destinationID].AttemptNums[job.JobID] = job.LastJobStatus.AttemptNum
554+
brt.asyncDestinationStruct[destinationID].FirstAttemptedAts[job.JobID] = getFirstAttemptAtFromErrorResponse(job.LastJobStatus.ErrorResponse)
555+
brt.asyncDestinationStruct[destinationID].JobParameters[job.JobID] = job.Parameters
556+
brt.asyncDestinationStruct[destinationID].PartitionIDs[job.JobID] = job.PartitionID
557+
}
590558
}
591559
if overFlow {
592560
brt.asyncDestinationStruct[destinationID].CanUpload = true
@@ -702,10 +670,10 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
702670
var statusList []*jobsdb.JobStatusT
703671
jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails)
704672
if len(params.AsyncOutput.ImportingJobIDs) > 0 {
705-
for _, jobId := range params.AsyncOutput.ImportingJobIDs {
673+
for _, jobId := range lo.Uniq(params.AsyncOutput.ImportingJobIDs) {
706674
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
707675
DestinationID: params.AsyncOutput.DestinationID,
708-
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
676+
SourceID: gjson.GetBytes(params.JobParameters[jobId], "source_id").String(),
709677
}
710678
status := jobsdb.JobStatusT{
711679
JobID: jobId,
@@ -716,17 +684,17 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
716684
ErrorCode: "200",
717685
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload),
718686
Parameters: params.AsyncOutput.ImportingParameters,
719-
JobParameters: params.OriginalJobParameters[jobId],
687+
JobParameters: params.JobParameters[jobId],
720688
WorkspaceId: workspaceID,
721689
}
722690
statusList = append(statusList, &status)
723691
}
724692
}
725693
if len(params.AsyncOutput.SucceededJobIDs) > 0 {
726-
for _, jobId := range params.AsyncOutput.SucceededJobIDs {
694+
for _, jobId := range lo.Uniq(params.AsyncOutput.SucceededJobIDs) {
727695
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
728696
DestinationID: params.AsyncOutput.DestinationID,
729-
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
697+
SourceID: gjson.GetBytes(params.JobParameters[jobId], "source_id").String(),
730698
}
731699
status := jobsdb.JobStatusT{
732700
JobID: jobId,
@@ -737,18 +705,18 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
737705
ErrorCode: "200",
738706
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(params.AsyncOutput.SuccessResponse)),
739707
Parameters: routerutils.EmptyPayload,
740-
JobParameters: params.OriginalJobParameters[jobId],
708+
JobParameters: params.JobParameters[jobId],
741709
WorkspaceId: workspaceID,
742710
}
743711
statusList = append(statusList, &status)
744-
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
712+
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.JobParameters[jobId]))
745713
}
746714
}
747715
if len(params.AsyncOutput.FailedJobIDs) > 0 {
748-
for _, jobId := range params.AsyncOutput.FailedJobIDs {
716+
for _, jobId := range lo.Uniq(params.AsyncOutput.FailedJobIDs) {
749717
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
750718
DestinationID: params.AsyncOutput.DestinationID,
751-
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
719+
SourceID: gjson.GetBytes(params.JobParameters[jobId], "source_id").String(),
752720
}
753721
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.FailedReason)
754722
status := jobsdb.JobStatusT{
@@ -760,7 +728,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
760728
ErrorCode: "500",
761729
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", resp),
762730
Parameters: routerutils.EmptyPayload,
763-
JobParameters: params.OriginalJobParameters[jobId],
731+
JobParameters: params.JobParameters[jobId],
764732
WorkspaceId: workspaceID,
765733
}
766734
if params.Attempted {
@@ -769,16 +737,16 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
769737

770738
if brt.retryLimitReached(&status) {
771739
status.JobState = jobsdb.Aborted.State
772-
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
740+
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.JobParameters[jobId]))
773741
}
774742
statusList = append(statusList, &status)
775743
}
776744
}
777745
if len(params.AsyncOutput.AbortJobIDs) > 0 {
778-
for _, jobId := range params.AsyncOutput.AbortJobIDs {
746+
for _, jobId := range lo.Uniq(params.AsyncOutput.AbortJobIDs) {
779747
jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{
780748
DestinationID: params.AsyncOutput.DestinationID,
781-
SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(),
749+
SourceID: gjson.GetBytes(params.JobParameters[jobId], "source_id").String(),
782750
}
783751
resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.AbortReason)
784752
status := jobsdb.JobStatusT{
@@ -790,11 +758,11 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
790758
ErrorCode: "400",
791759
ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)),
792760
Parameters: routerutils.EmptyPayload,
793-
JobParameters: params.OriginalJobParameters[jobId],
761+
JobParameters: params.JobParameters[jobId],
794762
WorkspaceId: workspaceID,
795763
}
796764
statusList = append(statusList, &status)
797-
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId]))
765+
completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.JobParameters[jobId]))
798766
}
799767
}
800768

@@ -811,7 +779,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
811779

812780
reportMetrics := brt.getReportMetrics(getReportMetricsParams{
813781
StatusList: statusList,
814-
ParametersMap: params.OriginalJobParameters,
782+
ParametersMap: params.JobParameters,
815783
JobsList: params.JobsList,
816784
})
817785

@@ -845,7 +813,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
845813
if params.Attempted {
846814
var sourceID string
847815
if len(statusList) > 0 {
848-
sourceID = gjson.GetBytes(params.OriginalJobParameters[statusList[0].JobID], "source_id").String()
816+
sourceID = gjson.GetBytes(params.JobParameters[statusList[0].JobID], "source_id").String()
849817
}
850818
brt.recordAsyncDestinationDeliveryStatus(sourceID, params.AsyncOutput.DestinationID, statusList)
851819
}

router/batchrouter/handle_observability.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,14 @@ func (brt *Handle) emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID str
150150
}
151151

152152
// Get original job parameters for this job
153-
originalParams, exists := asyncDestStruct.OriginalJobParameters[status.JobID]
153+
jobParameters, exists := asyncDestStruct.JobParameters[status.JobID]
154154
if !exists {
155155
brt.logger.Debugn("Original job parameters not found for jobID: %d", logger.NewIntField("jobID", status.JobID))
156156
continue
157157
}
158158

159159
// Extract receivedAt from original job parameters
160-
receivedAtStr := gjson.GetBytes(originalParams, "received_at").String()
160+
receivedAtStr := gjson.GetBytes(jobParameters, "received_at").String()
161161
if receivedAtStr == "" {
162162
brt.logger.Debugn("ReceivedAt not found in job parameters for jobID: %d", logger.NewIntField("jobID", status.JobID))
163163
continue
@@ -171,7 +171,7 @@ func (brt *Handle) emitAsyncEventDeliveryTimeMetrics(sourceID, destinationID str
171171
}
172172

173173
// Extract source category from original job parameters
174-
sourceCategory := gjson.GetBytes(originalParams, "source_category").String()
174+
sourceCategory := gjson.GetBytes(jobParameters, "source_category").String()
175175

176176
// Create and emit the event_delivery_time metric
177177
eventDeliveryTimeStat := stats.Default.NewTaggedStat("event_delivery_time", stats.TimerType, map[string]string{

0 commit comments

Comments
 (0)