Skip to content

Commit 04b6c63

Browse files
committed
fix(storage): fix race condition during retries in gRPC writer (#14649)
1 parent 20b37d6 commit 04b6c63

3 files changed

Lines changed: 112 additions & 1 deletion

File tree

storage/grpc_writer.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,31 @@ func (w *gRPCWriter) writeLoop(ctx context.Context) error {
443443
defer cancel()
444444
w.streamSender.connect(ctx, bscs, w.settings.gax...)
445445

446+
// Drain any initial completions (like QueryWriteStatus results).
447+
Loop:
448+
for {
449+
select {
450+
case c, ok := <-completions:
451+
if !ok {
452+
return w.streamSender.err()
453+
}
454+
w.handleCompletion(c)
455+
default:
456+
break Loop
457+
}
458+
}
459+
460+
if w.bufFlushedIdx > 0 {
461+
copy(w.buf, w.buf[w.bufFlushedIdx:])
462+
w.buf = w.buf[:len(w.buf)-w.bufFlushedIdx]
463+
w.bufBaseOffset += int64(w.bufFlushedIdx)
464+
w.bufUnsentIdx -= w.bufFlushedIdx
465+
if w.bufUnsentIdx < 0 {
466+
w.bufUnsentIdx = 0
467+
}
468+
w.bufFlushedIdx = -1
469+
}
470+
446471
// Send any full quantum in w.buf, possibly including a flush
447472
if err := w.withCommandRetryDeadline(func() error {
448473
sentOffset, ok := w.sendBufferToTarget(chcs, w.buf, w.bufBaseOffset, cap(w.buf),
@@ -591,6 +616,26 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
591616
return nil
592617
}
593618

619+
if !c.hasStarted {
620+
c.initialOffset = w.bufBaseOffset + int64(len(w.buf))
621+
c.hasStarted = true
622+
} else {
623+
// Retrying this command; check if server has persisted some bytes of this command's payload.
624+
bytesPersisted := w.bufBaseOffset - c.initialOffset
625+
if bytesPersisted > 0 {
626+
if int64(len(c.p)) < bytesPersisted {
627+
bytesPersisted = int64(len(c.p))
628+
}
629+
c.p = c.p[bytesPersisted:]
630+
c.initialOffset = w.bufBaseOffset
631+
632+
if len(c.p) == 0 {
633+
c.markDone()
634+
return nil
635+
}
636+
}
637+
}
638+
594639
// Zero-Copy send.
595640
if w.forceOneShot {
596641
err := c.zeroCopyWrite(w, cs)
@@ -650,6 +695,7 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
650695
w.buf = w.buf[:wblen+toNextWriteQuantum]
651696
copied := copy(w.buf[wblen:], c.p)
652697
c.p = c.p[copied:]
698+
c.initialOffset += int64(copied)
653699
firstFullBufFromCmd := cap(w.buf) - len(w.buf)
654700

655701
sending := w.buf[w.bufUnsentIdx:]
@@ -674,6 +720,7 @@ func (c *gRPCWriterCommandWrite) handle(w *gRPCWriter, cs gRPCWriterCommandHandl
674720
trim = len(c.p)
675721
}
676722
c.p = c.p[trim:]
723+
c.initialOffset += int64(trim)
677724
cmdBaseOffset = bufTail
678725
}
679726
offset := cmdBaseOffset

storage/internal/test/conformance/retry_tests.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,19 @@
258258
},
259259
{
260260
"instructions": ["return-503-after-8192K", "return-408"]
261+
},
262+
{
263+
"instructions": [ "return-503-after-4080K"]
264+
},
265+
{
266+
"instructions": [
267+
"return-503-after-1000K",
268+
"return-503-after-1024K",
269+
"return-503-after-1025K",
270+
"return-503-after-4000K",
271+
"return-503-after-4096K",
272+
"return-503-after-4097K"
273+
]
261274
}
262275
],
263276
"methods": [

storage/retry_conformance_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ import (
4343
const (
4444
projectID = "my-project-id"
4545
serviceAccountEmail = "my-sevice-account@my-project-id.iam.gserviceaccount.com"
46-
MiB = 1 << 10 << 10
46+
MiB = 1 << 20
47+
KiB = 1 << 10
4748
)
4849

4950
var (
@@ -782,6 +783,56 @@ var methods = map[string][]retryFunc{
782783
}
783784
return nil
784785
},
786+
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
787+
obj := c.Bucket(fs.bucket.Name).Object(objectIDs.New())
788+
if preconditions {
789+
obj = obj.If(Conditions{DoesNotExist: true})
790+
}
791+
w := obj.NewWriter(ctx)
792+
w.ChunkSize = 4 * MiB
793+
data := generateRandomBytes(32 * MiB)
794+
writeSize := 2 * MiB
795+
// Write 2MiB at a time to use buffers and not user's buffer.
796+
for i := 0; i < 16; i++ {
797+
if _, err := w.Write(data[i*writeSize : (i+1)*writeSize]); err != nil {
798+
return fmt.Errorf("writing object: %v", err)
799+
}
800+
}
801+
if err := w.Close(); err != nil {
802+
return fmt.Errorf("closing object: %v", err)
803+
}
804+
return nil
805+
},
806+
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
807+
obj := c.Bucket(fs.bucket.Name).Object(objectIDs.New())
808+
if preconditions {
809+
obj = obj.If(Conditions{DoesNotExist: true})
810+
}
811+
w := obj.NewWriter(ctx)
812+
w.ChunkSize = 4 * MiB
813+
data := generateRandomBytes(32 * MiB)
814+
totalWriteSize := 0
815+
times := 0
816+
for totalWriteSize < len(data) {
817+
times++
818+
// Write smaller messages and larger messages to exercise different paths.
819+
writeSize := 5 * KiB
820+
if times%2 == 0 {
821+
writeSize = 6 * MiB
822+
}
823+
if totalWriteSize+writeSize > len(data) {
824+
writeSize = len(data) - totalWriteSize
825+
}
826+
if _, err := w.Write(data[totalWriteSize : totalWriteSize+writeSize]); err != nil {
827+
return fmt.Errorf("writing object: %v", err)
828+
}
829+
totalWriteSize += writeSize
830+
}
831+
if err := w.Close(); err != nil {
832+
return fmt.Errorf("closing object: %v", err)
833+
}
834+
return nil
835+
},
785836
},
786837
"storage.appendable.upload": {
787838
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {

0 commit comments

Comments
 (0)