-
Notifications
You must be signed in to change notification settings - Fork 1.5k
fix(storage): Fix takeover response handling. #13239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(storage): Fix takeover response handling. #13239
Conversation
Summary of ChangesHello @cjc25, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue in storage write operations where a takeover response might provide a write handle. Previously, this handle was not properly utilized during retries, leading to potential errors due to incorrect identification of write sessions. The changes ensure that the client correctly processes and uses this handle, allowing for safe and idempotent retries after a takeover, thereby improving the robustness of appendable uploads. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request fixes an issue with takeover response handling in resumable writes by ensuring the write handle from the server is used in subsequent retries. The core logic change in grpc_writer.go is correct and well-targeted. A new conformance test has been added to verify this fix. My review focuses on the new test code in retry_conformance_test.go, where I've identified several opportunities to improve maintainability and consistency. Specifically, I've suggested refactoring to reduce code duplication, improving the structure of the long test function for better readability, and making error handling more consistent and robust. The changes effectively address the described bug.
| if err := b.Create(ctx, projectID, nil); err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better error diagnostics and consistency with other error handling in this function, consider wrapping this error with additional context.
| if err := b.Create(ctx, projectID, nil); err != nil { | |
| return err | |
| } | |
| if err := b.Create(ctx, projectID, nil); err != nil { | |
| return fmt.Errorf("failed to create bucket: %w", err) | |
| } |
| // Close this writer, which will create the appendable unfinalized object | ||
| // (there was not enough in Write to trigger a send). | ||
| if err := objW.Close(); err != nil { | ||
| return fmt.Errorf("Creation Writer.Close: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To enable proper error unwrapping using errors.Is or errors.As, it's better to use the %w format verb instead of %v when wrapping errors. This function is inconsistent, as it uses %w on line 764 but uses %v here and in several other places (e.g., lines 779, 786, 789, 800, 804). Using %w consistently is a modern Go best practice.
| return fmt.Errorf("Creation Writer.Close: %v", err) | |
| return fmt.Errorf("Creation Writer.Close: %w", err) |
| func(ctx context.Context, c *Client, fs *resources, preconditions bool) error { | ||
| bucketName := fmt.Sprintf("%s-appendable", bucketIDs.New()) | ||
| b := c.Bucket(bucketName) | ||
| if err := b.Create(ctx, projectID, nil); err != nil { | ||
| return err | ||
| } | ||
| defer b.Delete(ctx) | ||
|
|
||
| obj := b.Object(objectIDs.New()) | ||
| if preconditions { | ||
| obj = obj.If(Conditions{DoesNotExist: true}) | ||
| } | ||
|
|
||
| // Force multiple messages per chunk, and multiple chunks in the object. | ||
| chunkSize := 2 * maxPerMessageWriteSize | ||
| toWrite := generateRandomBytes(chunkSize * 3) | ||
|
|
||
| objW := obj.NewWriter(ctx) | ||
| objW.Append = true | ||
| objW.ChunkSize = chunkSize | ||
| if _, err := objW.Write(toWrite[0:maxPerMessageWriteSize]); err != nil { | ||
| return fmt.Errorf("Writer.Write: %w", err) | ||
| } | ||
| // Close this writer, which will create the appendable unfinalized object | ||
| // (there was not enough in Write to trigger a send). | ||
| if err := objW.Close(); err != nil { | ||
| return fmt.Errorf("Creation Writer.Close: %v", err) | ||
| } | ||
|
|
||
| generation := int64(0) | ||
| if preconditions { | ||
| generation = objW.Attrs().Generation | ||
| } | ||
| objT := b.Object(obj.ObjectName()).Generation(generation) | ||
| w, l, err := objT.NewWriterFromAppendableObject(ctx, &AppendableWriterOpts{ChunkSize: chunkSize}) | ||
| if err != nil { | ||
| return fmt.Errorf("NewWriterFromAppendableObject: %v", err) | ||
| } | ||
| if l != int64(maxPerMessageWriteSize) { | ||
| return fmt.Errorf("NewWriterFromAppendableObject unexpected len: got %v, want %v", l, maxPerMessageWriteSize) | ||
| } | ||
|
|
||
| if _, err := w.Write(toWrite[maxPerMessageWriteSize:]); err != nil { | ||
| return fmt.Errorf("Writer.Write: %v", err) | ||
| } | ||
| if err := w.Close(); err != nil { | ||
| return fmt.Errorf("Writer.Close: %v", err) | ||
| } | ||
|
|
||
| if w.Attrs() == nil { | ||
| return fmt.Errorf("Writer.Attrs: expected attrs for written object, got nil") | ||
| } | ||
|
|
||
| // Don't reuse obj, in case preconditions were set on the write request. | ||
| r, err := b.Object(obj.ObjectName()).NewReader(ctx) | ||
| defer r.Close() | ||
| if err != nil { | ||
| return fmt.Errorf("obj.NewReader: %v", err) | ||
| } | ||
| content, err := io.ReadAll(r) | ||
| if err != nil { | ||
| return fmt.Errorf("Reader.Read: %v", err) | ||
| } | ||
|
|
||
| gotMd5 := md5.Sum(content) | ||
| expectedMd5 := md5.Sum(toWrite) | ||
| if d := cmp.Diff(gotMd5, expectedMd5); d != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new test function is quite long (over 60 lines) and contains multiple logical steps, which can make it difficult to understand and maintain. Consider refactoring it by extracting distinct stages of the test into smaller, well-named helper functions. For example, you could have separate helpers for:
- Setting up and creating the initial appendable object.
- Performing the takeover write.
This would make the main test function a much clearer, high-level description of the test scenario.
| gotMd5 := md5.Sum(content) | ||
| expectedMd5 := md5.Sum(toWrite) | ||
| if d := cmp.Diff(gotMd5, expectedMd5); d != "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block of code for verifying content is duplicated from the test case above (lines 734-741). To improve maintainability and reduce code duplication, consider extracting this logic into a shared helper function. For example:
func verifyContent(content, toWrite []byte) error {
gotMd5 := md5.Sum(content)
expectedMd5 := md5.Sum(toWrite)
if d := cmp.Diff(gotMd5, expectedMd5); d != "" {
return fmt.Errorf("content mismatch, got %v bytes (md5: %v), want %v bytes (md5: %v)",
len(content), gotMd5, len(toWrite), expectedMd5)
}
return nil
}Then both tests could simply call return verifyContent(content, toWrite) at the end.
|
The retry conformance tests will start to pass once the testbench supports single-threaded takeovers, in v0.56.0. The MRD timeout is a preexisting flake, I think. |
tritone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
Does the intersection of the way the write is split up and the retry test scenarios cover a retry both on the initial takeover req and midway through the takeover write? Might be good to note that explicitly as far as how you chose how much data to write when.
A takeover response can return a handle. In that case, a retry should present that handle. Otherwise it will attempt a takeover when it retries. After a new takeover, the client can't tell if a partial write after the last known flush offset was performed by itself, or by some other writer. Therefore the client can't tell if it's safe to retry from the last known flush offset, and the server will return an error if it repeats offsets. The write handle allows the client to present enough information to the server to correctly identify replays on the same write session, avoiding an error when the client repeats bytes that are already durable.
7a11e36 to
f9ddb31
Compare
|
The test found a real bug (yay!) On rare occasions on my workstation, the takeover writer issues its first write at an earlier offset than the one returned by the server. The real service treats this as an error, but we don't model the full state machine in the testbench and it falls into retry handling, which silently drops these bytes. This probably means that there is a race in the spot where we determine the takeover offset for future writes. I'll figure that out before merging this PR. |
|
OK the bug is that we don't guarantee that the writer has handled the completion from the takeover response fully before returning control to the caller code. Caller gets control back when The writer has a fully up-to-date view of the correct buffer offsets when the completion put on the channel during takeover (https://github.com/googleapis/google-cloud-go/pull/13239/files#diff-2b5c89d316046e2b33163778a826af79ec60ff23d70a93cd0d4fa1daa2a494e8R1244) is processed by the writeLoop (https://github.com/googleapis/google-cloud-go/pull/13239/files#diff-2b5c89d316046e2b33163778a826af79ec60ff23d70a93cd0d4fa1daa2a494e8R447-R451) We should guarantee that we've handled this completion before returning control to the caller. It's a small fix, I think, so I'll just include it in this PR. |
tritone
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
🤖 I have created a release *beep* *boop* --- ## [1.57.1](storage/v1.57.0...storage/v1.57.1) (2025-10-28) ### Bug Fixes * **storage:** Takeover idempotence. ([#13230](#13230)) ([cc5d2a1](cc5d2a1)) * **storage:** Copy metadata when using Copier with grpc ([#12919](#12919)) ([57a2e80](57a2e80)) * **storage:** Fix takeover response handling. ([#13239](#13239)) ([26d75bc](26d75bc)) * **storage:** Remove default timeout for gRPC operations ([#13022](#13022)) ([b94c3ba](b94c3ba)) * **storage:** Skip download of file outside of target dir ([#12945](#12945)) ([6259aee](6259aee)) * **storage:** Upgrade gRPC service registration func ([8fffca2](8fffca2)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Brenna N Epp <brennae@google.com>
The initial takeover response wasn't being checked for a redirect. googleapis#13239 addressed successful responses. This addresses error responses.
A takeover response can return a handle. In that case, a retry should present that handle. Otherwise it will attempt a takeover when it retries. After a new takeover, the client can't tell if a partial write after the last known flush offset was performed by itself, or by some other writer. Therefore the client can't tell if it's safe to retry from the last known flush offset, and the server will return an error if it repeats offsets.
The write handle allows the client to present enough information to the server to correctly identify replays on the same write session, avoiding an error when the client repeats bytes that are already durable.