-
Notifications
You must be signed in to change notification settings - Fork 161
fix: retry ABORTED writes in bulkCommit #1222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## node10 #1222 +/- ##
==========================================
- Coverage 98.42% 98.42% -0.01%
==========================================
Files 30 30
Lines 18539 18626 +87
Branches 1425 1437 +12
==========================================
+ Hits 18247 18332 +85
- Misses 289 291 +2
Partials 3 3
Continue to review full report at Codecov.
|
dev/src/write-batch.ts
Outdated
| >('batchWrite', retryRequest, tag, retryCodes); | ||
|
|
||
| // Map the results of the retried request back to the original response. | ||
| for (let i = 0; i < abortedIndexes.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was hoping we could just extract a new BulkCommitBatch and send it off using the existing bulkCommit(). Is that feasible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with it. Switching it to a recursive bulkCommit() call adds a layer of complexity that the while loop implementation doesn't have. The only repeat code that using bulkCommit() saves is the call to make the request -- the rest of the reduce/mapping code still needs to be there. Am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if maybe this is the wrong layer - what would this look like if we made this change at the callsite to bulkCommit()? We could still have a single while loop, but it would encapsulate both sending the initial batch as well the retry. I threw this together in this Gist: https://gist.github.com/schmidt-sebastian/abbedb8412976166fe23ef3d323d613b
There is some major code smell in this proposal as we need to somehow extract the failed writes from a completed WriteBatch. We need to find a clean way to do this. I am beginning to think more and more that re-using the WriteBatch wasn't the right decision - after all, we don't actually use much of the Batch functionality, we mainly just use it to parse input, and maybe we could just have a couple of shared helper functions/a shared helper class that does this.
You will probably think that it is probably not worth changing all of this to make the code read a bit nicer. I am mostly with you on that, but I do think that we should try to mark the writes that do succeed as successful right away. I don't I believe we can get that to work if WriteBatch.bulkCommit() retries under the hood and blocks.
A customer could rightfully expect that we signal a successful write immediately It brings down the perceived latency of the WriteBatch endpoint and explains why a document shows up in the console or via other listeners. The only reason to artificially delay resolving the individual write Promise is the structure of our code, but we should keep in mind that are setting an example here for 5 more implementations. We should invest the effort to get this right.
I personally need to do a lot more thinking here to come up with a clean design that is easy to implement (even if it ends up being a large refactor, it should ideally be something that the IDE can do for the most part).
| "DEADLINE_EXCEEDED", | ||
| "INTERNAL", | ||
| "UNAVAILABLE" | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change needs to be made here: https://source.corp.google.com/piper///depot/google3/google/firestore/v1/firestore_grpc_service_config.json and then propagated via our Open Source tooling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still seems missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: in cl/318124350.
dev/src/write-batch.ts
Outdated
| >('batchWrite', retryRequest, tag, retryCodes); | ||
|
|
||
| // Map the results of the retried request back to the original response. | ||
| for (let i = 0; i < abortedIndexes.length; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if maybe this is the wrong layer - what would this look like if we made this change at the callsite to bulkCommit()? We could still have a single while loop, but it would encapsulate both sending the initial batch as well the retry. I threw this together in this Gist: https://gist.github.com/schmidt-sebastian/abbedb8412976166fe23ef3d323d613b
There is some major code smell in this proposal as we need to somehow extract the failed writes from a completed WriteBatch. We need to find a clean way to do this. I am beginning to think more and more that re-using the WriteBatch wasn't the right decision - after all, we don't actually use much of the Batch functionality, we mainly just use it to parse input, and maybe we could just have a couple of shared helper functions/a shared helper class that does this.
You will probably think that it is probably not worth changing all of this to make the code read a bit nicer. I am mostly with you on that, but I do think that we should try to mark the writes that do succeed as successful right away. I don't I believe we can get that to work if WriteBatch.bulkCommit() retries under the hood and blocks.
A customer could rightfully expect that we signal a successful write immediately It brings down the perceived latency of the WriteBatch endpoint and explains why a document shows up in the console or via other listeners. The only reason to artificially delay resolving the individual write Promise is the structure of our code, but we should keep in mind that are setting an example here for 5 more implementations. We should invest the effort to get this right.
I personally need to do a lot more thinking here to come up with a clean design that is easy to implement (even if it ends up being a large refactor, it should ideally be something that the IDE can do for the most part).
b8ce703 to
bdfef3e
Compare
|
Made a pass at moving the retry handling to BulkWriter. One part of complexity that I wish I could remove is mapping the subsequent batch retries back to the original index on resultsMap. |
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Back to Brian to see if we can use the document path for index tracking.
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update!
dev/src/bulk-writer.ts
Outdated
| for (let i = 0; i < results.length; i++) { | ||
| const writeResult = results[i]; | ||
| if (this.shouldRetry(writeResult.status.code)) { | ||
| indexesToRetry.push(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to reject the result here if the write fails with an error that is not retryable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Used your proposed simplification code.
| if (indexesToRetry.length === 0) { | ||
| this.completedDeferred.resolve(); | ||
| } | ||
| return indexesToRetry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we combine the different error handling code paths here?
Something like:
if (error) {
results = Array.from({length:this.opCount}, () => { status: { code: error.code }})
}
for (let i = 0; i < results.length; i++) {
const writeResult = results[i];
if (ok) {
const originalIndex = originalIndexMap.get(i)!;
this.resultsMap.get(originalIndex)!.resolve(results[i]);
} else if (this.shouldRetry(writeResult.status.code)
indexesToRetry.push(i);
} else {
reject;
}
}
if (indexesToRetry.length === 0) {
this.completedDeferred.resolve();
}
return indexesToRetry;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WOW, this is some poetic refactoring right here 🔥 🔥 🔥
| try { | ||
| await this.backoff.backoffAndWait(); | ||
| const results = await this.writeBatch.bulkCommit(); | ||
| retryIndexes = this.processResults(originalIndexMap, results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rewrite this so that processResults doesn't take originalIndexMap? It seems like we could apply the retryIndexes indexes in this function. I haven't played with this, but I wonder if this would lead to a simplification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to not pass in originalIndexMap to processResults -- the successful results are located at the indexes not in retryIndexes. From there we could resolve/reject the individual promises and reject the rest.
However, the main issue is that the resultsMaps resolution logic would have to live in bulkCommit(), which then means that processResults() would become findRetryableIndexes(). From there, it would make more sense to have all the logic in bulkCommit, which would make the method much longer.
With the new refactor, using originalIndexMap in processResults seems a lot cleaner. WDYT?
dev/src/bulk-writer.ts
Outdated
| while (retryAttempts < MAX_RETRY_ATTEMPTS) { | ||
| let retryIndexes: number[] = []; | ||
| try { | ||
| await this.backoff.backoffAndWait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Move this out of the try/catch block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| "DEADLINE_EXCEEDED", | ||
| "INTERNAL", | ||
| "UNAVAILABLE" | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It still seems missing?
dev/src/write-batch.ts
Outdated
| * @param indexes List of operation indexes to keep | ||
| * @private | ||
| */ | ||
| _sliceIndexes(indexes: number[]): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Array slice() method returns a copy, which I think would be preferably here as well. Can we update this to create a new WriteBatch that is marked '_committed=false`? The reason I am asking is that I would ideally not expose a method that mocks with the internals of a WriteBatch, as the potential for future misuse is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I did not consider the future potential misuse part.
dev/test/bulk-writer.ts
Outdated
|
|
||
| it('retries retryable batchWrite failures', async () => { | ||
| setTimeoutHandler(setImmediate); | ||
| let retryCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/retry/attempt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
dev/test/bulk-writer.ts
Outdated
| }); | ||
| }); | ||
|
|
||
| it('retries retryable batchWrite failures', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to make these two tests more distinguishable?
Something like:
retries batchWrite when single operation fails vs retries batchWrite when entire RPC fails.
Furthermore, should we test what happens when we have both non-rertyable and retryable writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed names. Also added a non-retryable error to the test for ABORTED retries.
thebrianchen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still waiting on the google3 changes to be approved.
dev/src/bulk-writer.ts
Outdated
| for (let i = 0; i < results.length; i++) { | ||
| const writeResult = results[i]; | ||
| if (this.shouldRetry(writeResult.status.code)) { | ||
| indexesToRetry.push(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Used your proposed simplification code.
dev/src/bulk-writer.ts
Outdated
| while (retryAttempts < MAX_RETRY_ATTEMPTS) { | ||
| let retryIndexes: number[] = []; | ||
| try { | ||
| await this.backoff.backoffAndWait(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| if (indexesToRetry.length === 0) { | ||
| this.completedDeferred.resolve(); | ||
| } | ||
| return indexesToRetry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WOW, this is some poetic refactoring right here 🔥 🔥 🔥
dev/src/write-batch.ts
Outdated
| * @param indexes List of operation indexes to keep | ||
| * @private | ||
| */ | ||
| _sliceIndexes(indexes: number[]): void { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I did not consider the future potential misuse part.
dev/test/bulk-writer.ts
Outdated
| }); | ||
| }); | ||
|
|
||
| it('retries retryable batchWrite failures', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed names. Also added a non-retryable error to the test for ABORTED retries.
dev/test/bulk-writer.ts
Outdated
|
|
||
| it('retries retryable batchWrite failures', async () => { | ||
| setTimeoutHandler(setImmediate); | ||
| let retryCount = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
| try { | ||
| await this.backoff.backoffAndWait(); | ||
| const results = await this.writeBatch.bulkCommit(); | ||
| retryIndexes = this.processResults(originalIndexMap, results); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to not pass in originalIndexMap to processResults -- the successful results are located at the indexes not in retryIndexes. From there we could resolve/reject the individual promises and reject the rest.
However, the main issue is that the resultsMaps resolution logic would have to live in bulkCommit(), which then means that processResults() would become findRetryableIndexes(). From there, it would make more sense to have all the logic in bulkCommit, which would make the method much longer.
With the new refactor, using originalIndexMap in processResults seems a lot cleaner. WDYT?
| "DEADLINE_EXCEEDED", | ||
| "INTERNAL", | ||
| "UNAVAILABLE" | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: in cl/318124350.
|
Moved to #1243 in order to merge against master. |
With throttling enabled in the current state (no retries on backend), this implementation successfully gets all ABORTED writes to eventually succeed.