@@ -1062,12 +1062,13 @@ func contextMetadataFromBidiReadObject(req *storagepb.BidiReadObjectRequest) []s
10621062}
10631063
10641064type rangeSpec struct {
1065- readID int64
1066- writer io.Writer
1067- offset int64
1068- limit int64
1069- bytesWritten int64
1070- callback func (int64 , int64 , error )
1065+ readID int64
1066+ writer io.Writer
1067+ offset int64
1068+ limit int64
1069+ currentBytesWritten int64
1070+ totalBytesWritten int64
1071+ callback func (int64 , int64 , error )
10711072}
10721073
10731074func (c * grpcStorageClient ) NewMultiRangeDownloader (ctx context.Context , params * newMultiRangeDownloaderParams , opts ... storageOption ) (mr * MultiRangeDownloader , err error ) {
@@ -1202,7 +1203,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12021203 rr .mu .Lock ()
12031204 if len (rr .mp ) != 0 {
12041205 for key := range rr .mp {
1205- rr .mp [key ].callback (rr .mp [key ].offset , rr .mp [key ].limit , fmt .Errorf ("stream closed early" ))
1206+ rr .mp [key ].callback (rr .mp [key ].offset , rr .mp [key ].totalBytesWritten , fmt .Errorf ("stream closed early" ))
12061207 delete (rr .mp , key )
12071208 }
12081209 }
@@ -1295,21 +1296,22 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
12951296 }
12961297 _ , err = rr .mp [id ].writer .Write (val .GetChecksummedData ().GetContent ())
12971298 if err != nil {
1298- rr .mp [id ].callback (rr .mp [id ].offset , rr .mp [id ].limit , err )
1299+ rr .mp [id ].callback (rr .mp [id ].offset , rr .mp [id ].totalBytesWritten , err )
12991300 rr .activeTask --
13001301 delete (rr .mp , id )
13011302 } else {
13021303 rr .mp [id ] = rangeSpec {
1303- readID : rr .mp [id ].readID ,
1304- writer : rr .mp [id ].writer ,
1305- offset : rr .mp [id ].offset ,
1306- limit : rr .mp [id ].limit ,
1307- bytesWritten : rr .mp [id ].bytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1308- callback : rr .mp [id ].callback ,
1304+ readID : rr .mp [id ].readID ,
1305+ writer : rr .mp [id ].writer ,
1306+ offset : rr .mp [id ].offset ,
1307+ limit : rr .mp [id ].limit ,
1308+ currentBytesWritten : rr .mp [id ].currentBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1309+ totalBytesWritten : rr .mp [id ].totalBytesWritten + int64 (len (val .GetChecksummedData ().GetContent ())),
1310+ callback : rr .mp [id ].callback ,
13091311 }
13101312 }
13111313 if val .GetRangeEnd () {
1312- rr .mp [id ].callback (rr .mp [id ].offset , rr .mp [id ].limit , nil )
1314+ rr .mp [id ].callback (rr .mp [id ].offset , rr .mp [id ].totalBytesWritten , nil )
13131315 rr .activeTask --
13141316 delete (rr .mp , id )
13151317 }
@@ -1340,7 +1342,7 @@ func (c *grpcStorageClient) NewMultiRangeDownloader(ctx context.Context, params
13401342 if err != nil {
13411343 rr .mu .Lock ()
13421344 for key := range rr .mp {
1343- rr .mp [key ].callback (rr .mp [key ].offset , rr .mp [key ].limit , err )
1345+ rr .mp [key ].callback (rr .mp [key ].offset , rr .mp [key ].totalBytesWritten , err )
13441346 delete (rr .mp , key )
13451347 }
13461348 // In case we hit an permanent error, delete entries from map and remove active tasks.
@@ -1388,12 +1390,13 @@ func getActiveRange(r *gRPCBidiReader) []rangeSpec {
13881390 var activeRange []rangeSpec
13891391 for k , v := range r .mp {
13901392 activeRange = append (activeRange , rangeSpec {
1391- readID : k ,
1392- writer : v .writer ,
1393- offset : (v .offset + v .bytesWritten ),
1394- limit : v .limit - v .bytesWritten ,
1395- callback : v .callback ,
1396- bytesWritten : 0 ,
1393+ readID : k ,
1394+ writer : v .writer ,
1395+ offset : (v .offset + v .currentBytesWritten ),
1396+ limit : v .limit - v .currentBytesWritten ,
1397+ callback : v .callback ,
1398+ currentBytesWritten : 0 ,
1399+ totalBytesWritten : v .totalBytesWritten ,
13971400 })
13981401 r .mp [k ] = activeRange [len (activeRange )- 1 ]
13991402 }
@@ -1443,22 +1446,22 @@ func (mr *gRPCBidiReader) add(output io.Writer, offset, limit int64, callback fu
14431446 mr .mu .Unlock ()
14441447
14451448 if offset > objectSize {
1446- callback (offset , limit , fmt .Errorf ("offset larger than size of object: %v" , objectSize ))
1449+ callback (offset , 0 , fmt .Errorf ("offset larger than size of object: %v" , objectSize ))
14471450 return
14481451 }
14491452 if limit < 0 {
1450- callback (offset , limit , fmt .Errorf ("limit can't be negative" ))
1453+ callback (offset , 0 , fmt .Errorf ("limit can't be negative" ))
14511454 return
14521455 }
14531456 mr .mu .Lock ()
14541457 currentID := (* mr ).readID
14551458 (* mr ).readID ++
14561459 if ! mr .done {
1457- spec := rangeSpec {readID : currentID , writer : output , offset : offset , limit : limit , bytesWritten : 0 , callback : callback }
1460+ spec := rangeSpec {readID : currentID , writer : output , offset : offset , limit : limit , currentBytesWritten : 0 , totalBytesWritten : 0 , callback : callback }
14581461 mr .activeTask ++
14591462 mr .data <- []rangeSpec {spec }
14601463 } else {
1461- callback (offset , limit , fmt .Errorf ("stream is closed, can't add range" ))
1464+ callback (offset , 0 , fmt .Errorf ("stream is closed, can't add range" ))
14621465 }
14631466 mr .mu .Unlock ()
14641467}
0 commit comments