Skip to content

Commit 26d75bc

Browse files
authored
fix(storage): Fix takeover response handling. (#13239)
A takeover response can return a handle. In that case, a retry should present that handle. Otherwise it will attempt a takeover when it retries. After a new takeover, the client can't tell if a partial write after the last known flush offset was performed by itself, or by some other writer. Therefore the client can't tell if it's safe to retry from the last known flush offset, and the server will return an error if it repeats offsets. The write handle allows the client to present enough information to the server to correctly identify replays on the same write session, avoiding an error when the client repeats bytes that are already durable.
1 parent e8bcc9b commit 26d75bc

File tree

2 files changed

+82
-6
lines changed

2 files changed

+82
-6
lines changed

storage/grpc_writer.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,8 +1169,8 @@ func (s *gRPCAppendBidiWriteBufferSender) handleStream(stream storagepb.Storage_
11691169

11701170
type gRPCAppendTakeoverBidiWriteBufferSender struct {
11711171
gRPCAppendBidiWriteBufferSender
1172-
takeoverReported bool
1173-
setTakeoverOffset func(int64)
1172+
takeoverReported bool
1173+
handleTakeoverCompletion func(gRPCBidiWriteCompletion)
11741174
}
11751175

11761176
func writeObjectSpecAsAppendObjectSpec(s *storagepb.WriteObjectSpec, gen int64) *storagepb.AppendObjectSpec {
@@ -1197,8 +1197,11 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
11971197
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
11981198
finalizeOnClose: w.finalizeOnClose,
11991199
},
1200-
takeoverReported: false,
1201-
setTakeoverOffset: w.setTakeoverOffset,
1200+
takeoverReported: false,
1201+
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
1202+
w.handleCompletion(c)
1203+
w.setTakeoverOffset(c.flushOffset)
1204+
},
12021205
}
12031206
}
12041207

@@ -1238,9 +1241,9 @@ func (s *gRPCAppendTakeoverBidiWriteBufferSender) connect(ctx context.Context, c
12381241
return
12391242
}
12401243

1241-
s.setTakeoverOffset(c.flushOffset)
1244+
s.maybeUpdateFirstMessage(resp)
12421245
s.takeoverReported = true
1243-
cs.completions <- *c
1246+
s.handleTakeoverCompletion(*c)
12441247
}
12451248

12461249
go s.handleStream(stream, cs, firstSend)

storage/retry_conformance_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,79 @@ var methods = map[string][]retryFunc{
731731
return fmt.Errorf("Reader.Read: %w", err)
732732
}
733733

734+
gotMd5 := md5.Sum(content)
735+
expectedMd5 := md5.Sum(toWrite)
736+
if d := cmp.Diff(gotMd5, expectedMd5); d != "" {
737+
return fmt.Errorf("content mismatch, got %v bytes (md5: %v), want %v bytes (md5: %v)",
738+
len(content), gotMd5, len(toWrite), expectedMd5)
739+
}
740+
return nil
741+
},
742+
// Appendable upload using a takeover.
743+
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
744+
bucketName := fmt.Sprintf("%s-appendable", bucketIDs.New())
745+
b := c.Bucket(bucketName)
746+
if err := b.Create(ctx, projectID, nil); err != nil {
747+
return err
748+
}
749+
defer b.Delete(ctx)
750+
751+
obj := b.Object(objectIDs.New())
752+
if preconditions {
753+
obj = obj.If(Conditions{DoesNotExist: true})
754+
}
755+
756+
// Force multiple messages per chunk, and multiple chunks in the object.
757+
chunkSize := 2 * maxPerMessageWriteSize
758+
toWrite := generateRandomBytes(chunkSize * 3)
759+
760+
objW := obj.NewWriter(ctx)
761+
objW.Append = true
762+
objW.ChunkSize = chunkSize
763+
if _, err := objW.Write(toWrite[0:maxPerMessageWriteSize]); err != nil {
764+
return fmt.Errorf("Writer.Write: %w", err)
765+
}
766+
// Close this writer, which will create the appendable unfinalized object
767+
// (there was not enough in Write to trigger a send).
768+
if err := objW.Close(); err != nil {
769+
return fmt.Errorf("Creation Writer.Close: %v", err)
770+
}
771+
772+
generation := int64(0)
773+
if preconditions {
774+
generation = objW.Attrs().Generation
775+
}
776+
objT := b.Object(obj.ObjectName()).Generation(generation)
777+
w, l, err := objT.NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{ChunkSize: chunkSize})
778+
if err != nil {
779+
return fmt.Errorf("NewWriterFromAppendableObject: %v", err)
780+
}
781+
if l != int64(maxPerMessageWriteSize) {
782+
return fmt.Errorf("NewWriterFromAppendableObject unexpected len: got %v, want %v", l, maxPerMessageWriteSize)
783+
}
784+
785+
if _, err := w.Write(toWrite[maxPerMessageWriteSize:]); err != nil {
786+
return fmt.Errorf("Writer.Write: %v", err)
787+
}
788+
if err := w.Close(); err != nil {
789+
return fmt.Errorf("Writer.Close: %v", err)
790+
}
791+
792+
if w.Attrs() == nil {
793+
return fmt.Errorf("Writer.Attrs: expected attrs for written object, got nil")
794+
}
795+
796+
// Don't reuse obj, in case preconditions were set on the write request.
797+
r, err := b.Object(obj.ObjectName()).NewReader(ctx)
798+
defer r.Close()
799+
if err != nil {
800+
return fmt.Errorf("obj.NewReader: %v", err)
801+
}
802+
content, err := io.ReadAll(r)
803+
if err != nil {
804+
return fmt.Errorf("Reader.Read: %v", err)
805+
}
806+
734807
gotMd5 := md5.Sum(content)
735808
expectedMd5 := md5.Sum(toWrite)
736809
if d := cmp.Diff(gotMd5, expectedMd5); d != "" {

0 commit comments

Comments
 (0)