-
Notifications
You must be signed in to change notification settings - Fork 75
chore: simplify BulkWriter #523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
eabb8f3 to
5ac45bf
Compare
Codecov Report
@@ Coverage Diff @@
## master #523 +/- ##
============================================
- Coverage 74.09% 73.81% -0.28%
+ Complexity 1117 1080 -37
============================================
Files 66 67 +1
Lines 5883 5801 -82
Branches 723 706 -17
============================================
- Hits 4359 4282 -77
Misses 1296 1296
+ Partials 228 223 -5 Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for porting this! Unfortunately, the unit tests flake when you run them until failure on IntelliJ. I suspect this might be because my old race conditions were covered by using MoreExecutors.directExecutor() instead of calling the executor by name, which forced the running thread to continue in the same context.
| * Sends the current batch and resets {@link #bulkCommitBatch}. | ||
| * | ||
| * @param flush If provided, keeps re-sending operations until no more operations are enqueued. | ||
| * This allows retries to resolve as part of a {#link flush()} or {#link close()} call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/#link/@link
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| } else { | ||
| bulkWriterExecutor.schedule( | ||
| new Runnable() { | ||
| public void sendOperation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| private final Executor errorExecutor; | ||
| private final BulkWriter.WriteErrorCallback errorListener; | ||
|
|
||
| private int failedAttempts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possible nit: Do we initialize variables in java even if it's implicit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find anything in the style guide, but it's a tad easier to read if we initialize here.
| * Represents a single write for BulkWriter, encapsulating operation dispatch and error handling. | ||
| */ | ||
| class BulkWriterOperation { | ||
| private final SettableApiFuture<WriteResult> future = SettableApiFuture.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: rename to operationFuture to avoid confusion with the local future variables in the error and success handlers (or maybe it's just me that gets confused).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also renamed the other Futures to callbackFuture.
| new ApiFutureCallback<Boolean>() { | ||
| @Override | ||
| public void onFailure(Throwable throwable) { | ||
| BulkWriterOperation.this.future.setException(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you rename the instance future variable, can you remove the need to use BulkWriterOperation.this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, cleaned up.
| GrpcStatusCode.of(Status.Code.ABORTED), | ||
| true)); | ||
|
|
||
| // @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't have BulkWriter mysteriously flaking everywhere again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Rule made debugging so difficult. I kept commenting it in and out. But the end state should be "commented in" :)
| public class BulkWriterTest { | ||
|
|
||
| @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); | ||
| private static final ApiFuture<GeneratedMessageV3> FAILED_FUTURE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very sleek cleanup.
| import javax.annotation.Nullable; | ||
| import java.util.concurrent.Executor; | ||
|
|
||
| /** Used to represent a batch on the BatchQueue. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update class name?
My attempt: A batch that holds and schedules BulkWriter operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. This should still match the original description. The pendingOperations are the operations that are scheduled on this batch. They might be scheduled on another batch too, but the scheduling itself doesn't take part here. Let me know if I am confused :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main issue I had was that "BatchQueue" is no longer a term used in BulkWriter.
| boolean isReadyToSend() { | ||
| return state == BatchState.READY_TO_SEND; | ||
| void enqueueOperation(BulkWriterOperation operation) { | ||
| boolean added = documents.add(operation.getDocumentReference()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, that optimization to add then check 🤯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will save so much CPU that we can soon start mining Bitcoins in the SDK.
schmidt-sebastian
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pushing me to fix the flakes. I think this uncovered an actual issue where the batch was sometimes modified by multiple threads (through the backoff handler as well as the user input).
| import javax.annotation.Nullable; | ||
| import java.util.concurrent.Executor; | ||
|
|
||
| /** Used to represent a batch on the BatchQueue. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. This should still match the original description. The pendingOperations are the operations that are scheduled on this batch. They might be scheduled on another batch too, but the scheduling itself doesn't take part here. Let me know if I am confused :)
| boolean isReadyToSend() { | ||
| return state == BatchState.READY_TO_SEND; | ||
| void enqueueOperation(BulkWriterOperation operation) { | ||
| boolean added = documents.add(operation.getDocumentReference()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will save so much CPU that we can soon start mining Bitcoins in the SDK.
| * Sends the current batch and resets {@link #bulkCommitBatch}. | ||
| * | ||
| * @param flush If provided, keeps re-sending operations until no more operations are enqueued. | ||
| * This allows retries to resolve as part of a {#link flush()} or {#link close()} call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| } else { | ||
| bulkWriterExecutor.schedule( | ||
| new Runnable() { | ||
| public void sendOperation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| private final Executor errorExecutor; | ||
| private final BulkWriter.WriteErrorCallback errorListener; | ||
|
|
||
| private int failedAttempts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find anything in the style guide, but it's a tad easier to read if we initialize here.
| * Represents a single write for BulkWriter, encapsulating operation dispatch and error handling. | ||
| */ | ||
| class BulkWriterOperation { | ||
| private final SettableApiFuture<WriteResult> future = SettableApiFuture.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Also renamed the other Futures to callbackFuture.
| new ApiFutureCallback<Boolean>() { | ||
| @Override | ||
| public void onFailure(Throwable throwable) { | ||
| BulkWriterOperation.this.future.setException(throwable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, cleaned up.
| GrpcStatusCode.of(Status.Code.ABORTED), | ||
| true)); | ||
|
|
||
| // @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Rule made debugging so difficult. I kept commenting it in and out. But the end state should be "commented in" :)
thebrianchen
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for diving into this one, really appreciate it :)
…java-firestore into mrschmidt/simplifyBulkWriter
This is my attempt to simplify some of the callback logic in BulkWriter, and will hopefully allow us add Backoff handling for retries pretty easily.
The overall changes are:
One behavior change is that it is now possible that a retry will get scheduled after other operations in a flush. Let's say we have the calls:
And Op1 fails. The write order could now be: Op1, Op2, Op3, Op1. This breaks a single unit test, but I think it is sane behavior as it allows us to make progress while we wait for more reponses.
Port of googleapis/nodejs-firestore#1379