@@ -39,6 +39,7 @@ type gRPCAppendBidiWriteBufferSender struct {
3939 objectChecksums * storagepb.ObjectChecksums
4040
4141 forceFirstMessage bool
42+ progress func (int64 )
4243 flushOffset int64
4344
4445 // Fields used to report responses from the receive side of the stream
@@ -62,6 +63,7 @@ func (w *gRPCWriter) newGRPCAppendBidiWriteBufferSender() (*gRPCAppendBidiWriteB
6263 },
6364 objectChecksums : toProtoChecksums (w .sendCRC32C , w .attrs ),
6465 forceFirstMessage : true ,
66+ progress : w .progress ,
6567 }
6668 return s , nil
6769}
@@ -246,26 +248,35 @@ func (s *gRPCAppendBidiWriteBufferSender) sendOnConnectedStream(buf []byte, offs
246248 if s .recvErr != io .EOF {
247249 return nil , s .recvErr
248250 }
251+ if obj .GetSize () > s .flushOffset {
252+ s .flushOffset = obj .GetSize ()
253+ s .progress (s .flushOffset )
254+ }
249255 return
250256 }
251257
252258 if flush {
253259 // We don't necessarily expect multiple responses for a single flush, but
254260 // this allows the server to send multiple responses if it wants to.
255- for s .flushOffset < offset + int64 (len (buf )) {
261+ flushOffset := s .flushOffset
262+ for flushOffset < offset + int64 (len (buf )) {
256263 resp , ok := <- s .recvs
257264 if ! ok {
258265 return nil , s .recvErr
259266 }
260267 pSize := resp .GetPersistedSize ()
261268 rSize := resp .GetResource ().GetSize ()
262- if s . flushOffset < pSize {
263- s . flushOffset = pSize
269+ if flushOffset < pSize {
270+ flushOffset = pSize
264271 }
265- if s . flushOffset < rSize {
266- s . flushOffset = rSize
272+ if flushOffset < rSize {
273+ flushOffset = rSize
267274 }
268275 }
276+ if s .flushOffset < flushOffset {
277+ s .flushOffset = flushOffset
278+ s .progress (s .flushOffset )
279+ }
269280 }
270281
271282 return
0 commit comments