@@ -360,11 +360,9 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
360360 uploadResponse := brt .asyncDestinationStruct [destinationID ].Manager .Upload (brt .asyncDestinationStruct [destinationID ])
361361
362362 brt .setMultipleJobStatus (setMultipleJobStatusParams {
363- AsyncOutput : uploadResponse ,
364- Attempted : true ,
365- AttemptNums : brt .asyncDestinationStruct [destinationID ].AttemptNums ,
366- FirstAttemptedAts : brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts ,
367- OriginalJobParameters : brt .asyncDestinationStruct [destinationID ].OriginalJobParameters ,
363+ asyncJobMetadata : newAsyncJobMetadataFromDestinationStruct (brt .asyncDestinationStruct [destinationID ]),
364+ AsyncOutput : uploadResponse ,
365+ Attempted : true ,
368366 })
369367 if uploadResponse .ImportingParameters != nil && len (uploadResponse .ImportingJobIDs ) > 0 {
370368 brt .asyncDestinationStruct [destinationID ].UploadInProgress = true
@@ -378,7 +376,7 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) {
378376 }
379377}
380378
381- func (brt * Handle ) asyncStructSetup (sourceID , destinationID string , attemptNums map [ int64 ] int , firstAttemptedAts map [ int64 ]time. Time , originalJobParameters map [ int64 ]stdjson. RawMessage ) {
379+ func (brt * Handle ) asyncStructSetup (sourceID , destinationID string , jobsList [] * jobsdb. JobT ) {
382380 localTmpDirName := fmt .Sprintf (`/%s/` , misc .RudderAsyncDestinationLogs )
383381 uuid := uuid .New ()
384382
@@ -394,15 +392,16 @@ func (brt *Handle) asyncStructSetup(sourceID, destinationID string, attemptNums
394392 }
395393
396394 existingJobRunID := brt .asyncDestinationStruct [destinationID ].SourceJobRunID
397- newJobRunID := getFirstSourceJobRunID (originalJobParameters )
395+ asyncJobMetadata := newAsyncJobMetadata (jobsList )
396+ newJobRunID := getFirstSourceJobRunID (asyncJobMetadata .JobParameters )
398397 if newJobRunID != existingJobRunID {
399398 brt .asyncDestinationStruct [destinationID ].PartFileNumber = 0
400399 }
401-
402400 brt .asyncDestinationStruct [destinationID ].Exists = true
403- brt .asyncDestinationStruct [destinationID ].AttemptNums = attemptNums
404- brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts = firstAttemptedAts
405- brt .asyncDestinationStruct [destinationID ].OriginalJobParameters = originalJobParameters
401+ brt .asyncDestinationStruct [destinationID ].AttemptNums = asyncJobMetadata .AttemptNums
402+ brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts = asyncJobMetadata .FirstAttemptedAts
403+ brt .asyncDestinationStruct [destinationID ].JobParameters = asyncJobMetadata .JobParameters
404+ brt .asyncDestinationStruct [destinationID ].PartitionIDs = asyncJobMetadata .PartitionIDs
406405 brt .asyncDestinationStruct [destinationID ].FileName = jsonPath
407406 brt .asyncDestinationStruct [destinationID ].CreatedAt = brt .now ()
408407 brt .asyncDestinationStruct [destinationID ].SourceJobRunID = newJobRunID
@@ -418,33 +417,11 @@ func (brt *Handle) asyncStructCleanUp(destinationID string) {
418417 brt .asyncDestinationStruct [destinationID ].Count = 0
419418 brt .asyncDestinationStruct [destinationID ].CanUpload = false
420419 brt .asyncDestinationStruct [destinationID ].DestinationUploadURL = ""
420+
421421 brt .asyncDestinationStruct [destinationID ].AttemptNums = make (map [int64 ]int )
422422 brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts = make (map [int64 ]time.Time )
423- brt .asyncDestinationStruct [destinationID ].OriginalJobParameters = make (map [int64 ]stdjson.RawMessage )
424- }
425-
426- func getAttemptNumbers (jobs []* jobsdb.JobT ) map [int64 ]int {
427- attemptNums := make (map [int64 ]int )
428- for _ , job := range jobs {
429- attemptNums [job .JobID ] = job .LastJobStatus .AttemptNum
430- }
431- return attemptNums
432- }
433-
434- func getFirstAttemptAts (jobs []* jobsdb.JobT ) map [int64 ]time.Time {
435- firstAttemptedAts := make (map [int64 ]time.Time )
436- for _ , job := range jobs {
437- firstAttemptedAts [job .JobID ] = getFirstAttemptAtFromErrorResponse (job .LastJobStatus .ErrorResponse )
438- }
439- return firstAttemptedAts
440- }
441-
442- func getOriginalJobParameters (jobs []* jobsdb.JobT ) map [int64 ]stdjson.RawMessage {
443- originalJobParameters := make (map [int64 ]stdjson.RawMessage )
444- for _ , job := range jobs {
445- originalJobParameters [job .JobID ] = job .Parameters
446- }
447- return originalJobParameters
423+ brt .asyncDestinationStruct [destinationID ].JobParameters = make (map [int64 ]stdjson.RawMessage )
424+ brt .asyncDestinationStruct [destinationID ].PartitionIDs = make (map [int64 ]string )
448425}
449426
450427func (brt * Handle ) sendJobsToStorage (batchJobs BatchedJobs ) error {
@@ -459,11 +436,9 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
459436 }
460437
461438 brt .setMultipleJobStatus (setMultipleJobStatusParams {
462- AsyncOutput : out ,
463- AttemptNums : getAttemptNumbers (batchJobs .Jobs ),
464- FirstAttemptedAts : getFirstAttemptAts (batchJobs .Jobs ),
465- OriginalJobParameters : getOriginalJobParameters (batchJobs .Jobs ),
466- JobsList : batchJobs .Jobs ,
439+ asyncJobMetadata : newAsyncJobMetadata (batchJobs .Jobs ),
440+ AsyncOutput : out ,
441+ JobsList : batchJobs .Jobs ,
467442 })
468443 return nil
469444 }
@@ -492,25 +467,24 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
492467 }
493468
494469 brt .setMultipleJobStatus (setMultipleJobStatusParams {
495- AsyncOutput : out ,
496- AttemptNums : getAttemptNumbers (batchJobs .Jobs ),
497- FirstAttemptedAts : getFirstAttemptAts (batchJobs .Jobs ),
498- OriginalJobParameters : getOriginalJobParameters (batchJobs .Jobs ),
499- JobsList : batchJobs .Jobs ,
470+ asyncJobMetadata : newAsyncJobMetadata (batchJobs .Jobs ),
471+ AsyncOutput : out ,
472+ JobsList : batchJobs .Jobs ,
500473 })
501474 return nil
502475 }
503476 }
504477
478+ var skipUpdateStructJobMetadataMaps bool
505479 if ! ok || ! brt .asyncDestinationStruct [destinationID ].Exists {
506480 if ! ok {
507481 asyncStruct := & common.AsyncDestinationStruct {}
508482 asyncStruct .UploadMutex .Lock ()
509483 defer asyncStruct .UploadMutex .Unlock ()
510484 brt .asyncDestinationStruct [destinationID ] = asyncStruct
511485 }
512-
513- brt . asyncStructSetup ( batchJobs . Connection . Source . ID , destinationID , getAttemptNumbers ( batchJobs . Jobs ), getFirstAttemptAts ( batchJobs . Jobs ), getOriginalJobParameters ( batchJobs . Jobs ))
486+ brt . asyncStructSetup ( batchJobs . Connection . Source . ID , destinationID , batchJobs . Jobs )
487+ skipUpdateStructJobMetadataMaps = true
514488 }
515489
516490 file , err := os .OpenFile (brt .asyncDestinationStruct [destinationID ].FileName , os .O_CREATE | os .O_WRONLY , 0o600 )
@@ -565,28 +539,22 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) error {
565539
566540 brt .setMultipleJobStatus (
567541 setMultipleJobStatusParams {
568- AsyncOutput : out ,
569- AttemptNums : getAttemptNumbers (batchJobs .Jobs ),
570- FirstAttemptedAts : getFirstAttemptAts (batchJobs .Jobs ),
571- OriginalJobParameters : getOriginalJobParameters (batchJobs .Jobs ),
572- JobsList : batchJobs .Jobs ,
542+ asyncJobMetadata : newAsyncJobMetadata (batchJobs .Jobs ),
543+ AsyncOutput : out ,
544+ JobsList : batchJobs .Jobs ,
573545 },
574546 )
575547
576548 return nil
577549 }
578550
579- newAttemptNums := getAttemptNumbers (batchJobs .Jobs )
580- for jobID , attemptNum := range newAttemptNums {
581- brt .asyncDestinationStruct [destinationID ].AttemptNums [jobID ] = attemptNum
582- }
583- newFirstAttemptedAts := getFirstAttemptAts (batchJobs .Jobs )
584- for jobID , firstAttemptedAt := range newFirstAttemptedAts {
585- brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts [jobID ] = firstAttemptedAt
586- }
587- newOriginalJobParameters := getOriginalJobParameters (batchJobs .Jobs )
588- for jobID , originalJobParameter := range newOriginalJobParameters {
589- brt .asyncDestinationStruct [destinationID ].OriginalJobParameters [jobID ] = originalJobParameter
551+ if ! skipUpdateStructJobMetadataMaps { // update job metadata maps of asyncDestinationStruct
552+ for _ , job := range batchJobs .Jobs {
553+ brt .asyncDestinationStruct [destinationID ].AttemptNums [job .JobID ] = job .LastJobStatus .AttemptNum
554+ brt .asyncDestinationStruct [destinationID ].FirstAttemptedAts [job .JobID ] = getFirstAttemptAtFromErrorResponse (job .LastJobStatus .ErrorResponse )
555+ brt .asyncDestinationStruct [destinationID ].JobParameters [job .JobID ] = job .Parameters
556+ brt .asyncDestinationStruct [destinationID ].PartitionIDs [job .JobID ] = job .PartitionID
557+ }
590558 }
591559 if overFlow {
592560 brt .asyncDestinationStruct [destinationID ].CanUpload = true
@@ -702,10 +670,10 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
702670 var statusList []* jobsdb.JobStatusT
703671 jobIDConnectionDetailsMap := make (map [int64 ]jobsdb.ConnectionDetails )
704672 if len (params .AsyncOutput .ImportingJobIDs ) > 0 {
705- for _ , jobId := range params .AsyncOutput .ImportingJobIDs {
673+ for _ , jobId := range lo . Uniq ( params .AsyncOutput .ImportingJobIDs ) {
706674 jobIDConnectionDetailsMap [jobId ] = jobsdb.ConnectionDetails {
707675 DestinationID : params .AsyncOutput .DestinationID ,
708- SourceID : gjson .GetBytes (params .OriginalJobParameters [jobId ], "source_id" ).String (),
676+ SourceID : gjson .GetBytes (params .JobParameters [jobId ], "source_id" ).String (),
709677 }
710678 status := jobsdb.JobStatusT {
711679 JobID : jobId ,
@@ -716,17 +684,17 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
716684 ErrorCode : "200" ,
717685 ErrorResponse : routerutils .EnhanceJsonWithTime (params .FirstAttemptedAts [jobId ], "firstAttemptedAt" , routerutils .EmptyPayload ),
718686 Parameters : params .AsyncOutput .ImportingParameters ,
719- JobParameters : params .OriginalJobParameters [jobId ],
687+ JobParameters : params .JobParameters [jobId ],
720688 WorkspaceId : workspaceID ,
721689 }
722690 statusList = append (statusList , & status )
723691 }
724692 }
725693 if len (params .AsyncOutput .SucceededJobIDs ) > 0 {
726- for _ , jobId := range params .AsyncOutput .SucceededJobIDs {
694+ for _ , jobId := range lo . Uniq ( params .AsyncOutput .SucceededJobIDs ) {
727695 jobIDConnectionDetailsMap [jobId ] = jobsdb.ConnectionDetails {
728696 DestinationID : params .AsyncOutput .DestinationID ,
729- SourceID : gjson .GetBytes (params .OriginalJobParameters [jobId ], "source_id" ).String (),
697+ SourceID : gjson .GetBytes (params .JobParameters [jobId ], "source_id" ).String (),
730698 }
731699 status := jobsdb.JobStatusT {
732700 JobID : jobId ,
@@ -737,18 +705,18 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
737705 ErrorCode : "200" ,
738706 ErrorResponse : routerutils .EnhanceJsonWithTime (params .FirstAttemptedAts [jobId ], "firstAttemptedAt" , stdjson .RawMessage (params .AsyncOutput .SuccessResponse )),
739707 Parameters : routerutils .EmptyPayload ,
740- JobParameters : params .OriginalJobParameters [jobId ],
708+ JobParameters : params .JobParameters [jobId ],
741709 WorkspaceId : workspaceID ,
742710 }
743711 statusList = append (statusList , & status )
744- completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .OriginalJobParameters [jobId ]))
712+ completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .JobParameters [jobId ]))
745713 }
746714 }
747715 if len (params .AsyncOutput .FailedJobIDs ) > 0 {
748- for _ , jobId := range params .AsyncOutput .FailedJobIDs {
716+ for _ , jobId := range lo . Uniq ( params .AsyncOutput .FailedJobIDs ) {
749717 jobIDConnectionDetailsMap [jobId ] = jobsdb.ConnectionDetails {
750718 DestinationID : params .AsyncOutput .DestinationID ,
751- SourceID : gjson .GetBytes (params .OriginalJobParameters [jobId ], "source_id" ).String (),
719+ SourceID : gjson .GetBytes (params .JobParameters [jobId ], "source_id" ).String (),
752720 }
753721 resp := misc .UpdateJSONWithNewKeyVal (routerutils .EmptyPayload , "error" , params .AsyncOutput .FailedReason )
754722 status := jobsdb.JobStatusT {
@@ -760,7 +728,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
760728 ErrorCode : "500" ,
761729 ErrorResponse : routerutils .EnhanceJsonWithTime (params .FirstAttemptedAts [jobId ], "firstAttemptedAt" , resp ),
762730 Parameters : routerutils .EmptyPayload ,
763- JobParameters : params .OriginalJobParameters [jobId ],
731+ JobParameters : params .JobParameters [jobId ],
764732 WorkspaceId : workspaceID ,
765733 }
766734 if params .Attempted {
@@ -769,16 +737,16 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
769737
770738 if brt .retryLimitReached (& status ) {
771739 status .JobState = jobsdb .Aborted .State
772- completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .OriginalJobParameters [jobId ]))
740+ completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .JobParameters [jobId ]))
773741 }
774742 statusList = append (statusList , & status )
775743 }
776744 }
777745 if len (params .AsyncOutput .AbortJobIDs ) > 0 {
778- for _ , jobId := range params .AsyncOutput .AbortJobIDs {
746+ for _ , jobId := range lo . Uniq ( params .AsyncOutput .AbortJobIDs ) {
779747 jobIDConnectionDetailsMap [jobId ] = jobsdb.ConnectionDetails {
780748 DestinationID : params .AsyncOutput .DestinationID ,
781- SourceID : gjson .GetBytes (params .OriginalJobParameters [jobId ], "source_id" ).String (),
749+ SourceID : gjson .GetBytes (params .JobParameters [jobId ], "source_id" ).String (),
782750 }
783751 resp := misc .UpdateJSONWithNewKeyVal (routerutils .EmptyPayload , "error" , params .AsyncOutput .AbortReason )
784752 status := jobsdb.JobStatusT {
@@ -790,11 +758,11 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
790758 ErrorCode : "400" ,
791759 ErrorResponse : routerutils .EnhanceJsonWithTime (params .FirstAttemptedAts [jobId ], "firstAttemptedAt" , stdjson .RawMessage (resp )),
792760 Parameters : routerutils .EmptyPayload ,
793- JobParameters : params .OriginalJobParameters [jobId ],
761+ JobParameters : params .JobParameters [jobId ],
794762 WorkspaceId : workspaceID ,
795763 }
796764 statusList = append (statusList , & status )
797- completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .OriginalJobParameters [jobId ]))
765+ completedJobsList = append (completedJobsList , brt .createFakeJob (jobId , params .JobParameters [jobId ]))
798766 }
799767 }
800768
@@ -811,7 +779,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
811779
812780 reportMetrics := brt .getReportMetrics (getReportMetricsParams {
813781 StatusList : statusList ,
814- ParametersMap : params .OriginalJobParameters ,
782+ ParametersMap : params .JobParameters ,
815783 JobsList : params .JobsList ,
816784 })
817785
@@ -845,7 +813,7 @@ func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) {
845813 if params .Attempted {
846814 var sourceID string
847815 if len (statusList ) > 0 {
848- sourceID = gjson .GetBytes (params .OriginalJobParameters [statusList [0 ].JobID ], "source_id" ).String ()
816+ sourceID = gjson .GetBytes (params .JobParameters [statusList [0 ].JobID ], "source_id" ).String ()
849817 }
850818 brt .recordAsyncDestinationDeliveryStatus (sourceID , params .AsyncOutput .DestinationID , statusList )
851819 }
0 commit comments