@@ -443,6 +443,31 @@ func (w *gRPCWriter) writeLoop(ctx context.Context) error {
443443 defer cancel ()
444444 w .streamSender .connect (ctx , bscs , w .settings .gax ... )
445445
446+ // Drain any initial completions (like QueryWriteStatus results).
447+ Loop:
448+ for {
449+ select {
450+ case c , ok := <- completions :
451+ if ! ok {
452+ return w .streamSender .err ()
453+ }
454+ w .handleCompletion (c )
455+ default :
456+ break Loop
457+ }
458+ }
459+
460+ if w .bufFlushedIdx > 0 {
461+ copy (w .buf , w .buf [w .bufFlushedIdx :])
462+ w .buf = w .buf [:len (w .buf )- w .bufFlushedIdx ]
463+ w .bufBaseOffset += int64 (w .bufFlushedIdx )
464+ w .bufUnsentIdx -= w .bufFlushedIdx
465+ if w .bufUnsentIdx < 0 {
466+ w .bufUnsentIdx = 0
467+ }
468+ w .bufFlushedIdx = - 1
469+ }
470+
446471 // Send any full quantum in w.buf, possibly including a flush
447472 if err := w .withCommandRetryDeadline (func () error {
448473 sentOffset , ok := w .sendBufferToTarget (chcs , w .buf , w .bufBaseOffset , cap (w .buf ),
@@ -591,6 +616,26 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
591616 return nil
592617 }
593618
619+ if ! c .hasStarted {
620+ c .initialOffset = w .bufBaseOffset + int64 (len (w .buf ))
621+ c .hasStarted = true
622+ } else {
623+ // Retrying this command; check if server has persisted some bytes of this command's payload.
624+ bytesPersisted := w .bufBaseOffset - c .initialOffset
625+ if bytesPersisted > 0 {
626+ if int64 (len (c .p )) < bytesPersisted {
627+ bytesPersisted = int64 (len (c .p ))
628+ }
629+ c .p = c .p [bytesPersisted :]
630+ c .initialOffset = w .bufBaseOffset
631+
632+ if len (c .p ) == 0 {
633+ c .markDone ()
634+ return nil
635+ }
636+ }
637+ }
638+
594639 // Zero-Copy send.
595640 if w .forceOneShot {
596641 err := c .zeroCopyWrite (w , cs )
@@ -650,6 +695,7 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
650695 w .buf = w .buf [:wblen + toNextWriteQuantum ]
651696 copied := copy (w .buf [wblen :], c .p )
652697 c .p = c .p [copied :]
698+ c .initialOffset += int64 (copied )
653699 firstFullBufFromCmd := cap (w .buf ) - len (w .buf )
654700
655701 sending := w .buf [w .bufUnsentIdx :]
@@ -674,6 +720,7 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
674720 trim = len (c .p )
675721 }
676722 c .p = c .p [trim :]
723+ c .initialOffset += int64 (trim )
677724 cmdBaseOffset = bufTail
678725 }
679726 offset := cmdBaseOffset
0 commit comments