Skip to content

Conversation

@schmidt-sebastian
Copy link
Contributor

@schmidt-sebastian schmidt-sebastian commented Feb 3, 2021

This is my attempt to simplify some of the callback logic in BulkWriter, and will hopefully allow us add Backoff handling for retries pretty easily.

The overall changes are:

  • There are no more batch queues. Instead, there is only ever a single batch. If this batch reaches capacity, it is send.
  • To allow 'flush' to block, I am instead keeping a list of pending operations.

One behavior change is that it is now possible that a retry will get scheduled after other operations in a flush. Let's say we have the calls:

  • Op1
  • Op2
  • Flush
  • Op3

And Op1 fails. The write order could now be: Op1, Op2, Op3, Op1. This breaks a single unit test, but I think it is sane behavior as it allows us to make progress while we wait for more reponses.

Port of googleapis/nodejs-firestore#1379

@product-auto-label product-auto-label bot added the api: firestore Issues related to the googleapis/java-firestore API. label Feb 3, 2021
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Feb 3, 2021
@schmidt-sebastian schmidt-sebastian force-pushed the mrschmidt/simplifyBulkWriter branch from eabb8f3 to 5ac45bf Compare February 3, 2021 19:05
@codecov
Copy link

codecov bot commented Feb 3, 2021

Codecov Report

Merging #523 (ee7a945) into master (0e6f3da) will decrease coverage by 0.27%.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master     #523      +/-   ##
============================================
- Coverage     74.09%   73.81%   -0.28%     
+ Complexity     1117     1080      -37     
============================================
  Files            66       67       +1     
  Lines          5883     5801      -82     
  Branches        723      706      -17     
============================================
- Hits           4359     4282      -77     
  Misses         1296     1296              
+ Partials        228      223       -5     
Impacted Files Coverage Δ Complexity Δ
...n/java/com/google/cloud/firestore/Transaction.java 85.41% <ø> (ø) 13.00 <0.00> (ø)
...in/java/com/google/cloud/firestore/WriteBatch.java 100.00% <ø> (ø) 3.00 <0.00> (ø)
...va/com/google/cloud/firestore/BulkCommitBatch.java 100.00% <100.00%> (+6.94%) 7.00 <4.00> (-11.00) ⬆️
...in/java/com/google/cloud/firestore/BulkWriter.java 100.00% <100.00%> (+1.75%) 42.00 <9.00> (-12.00) ⬆️
...om/google/cloud/firestore/BulkWriterOperation.java 100.00% <100.00%> (ø) 7.00 <7.00> (?)
...java/com/google/cloud/firestore/UpdateBuilder.java 94.97% <100.00%> (ø) 59.00 <0.00> (ø)
...a/com/google/cloud/firestore/BatchWriteResult.java 0.00% <0.00%> (-100.00%) 0.00% <0.00%> (-3.00%)
...n/java/com/google/cloud/firestore/RateLimiter.java 95.00% <0.00%> (-5.00%) 14.00% <0.00%> (-1.00%)
...oogle/cloud/firestore/v1/FirestoreAdminClient.java 55.97% <0.00%> (-1.50%) 38.00% <0.00%> (ø%)
...a/com/google/cloud/firestore/DocumentSnapshot.java 80.85% <0.00%> (-0.87%) 39.00% <0.00%> (ø%)
... and 8 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0e6f3da...ee7a945. Read the comment docs.

@schmidt-sebastian schmidt-sebastian changed the title Mrschmidt/simplify bulk writer chore: simplify BulkWriter Feb 3, 2021
@schmidt-sebastian schmidt-sebastian marked this pull request as ready for review February 3, 2021 19:09
@schmidt-sebastian schmidt-sebastian requested a review from a team as a code owner February 3, 2021 19:09
@schmidt-sebastian schmidt-sebastian requested review from a team and thebrianchen February 3, 2021 19:09
Copy link

@thebrianchen thebrianchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for porting this! Unfortunately, the unit tests flake when you run them until failure on IntelliJ. I suspect this might be because my old race conditions were covered by using MoreExecutors.directExecutor() instead of calling the executor by name, which forced the running thread to continue in the same context.

* Sends the current batch and resets {@link #bulkCommitBatch}.
*
* @param flush If provided, keeps re-sending operations until no more operations are enqueued.
* This allows retries to resolve as part of a {#link flush()} or {#link close()} call.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/#link/@link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

} else {
bulkWriterExecutor.schedule(
new Runnable() {
public void sendOperation(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

private final Executor errorExecutor;
private final BulkWriter.WriteErrorCallback errorListener;

private int failedAttempts;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possible nit: Do we initialize variables in java even if it's implicit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find anything in the style guide, but it's a tad easier to read if we initialize here.

* Represents a single write for BulkWriter, encapsulating operation dispatch and error handling.
*/
class BulkWriterOperation {
private final SettableApiFuture<WriteResult> future = SettableApiFuture.create();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: rename to operationFuture to avoid confusion with the local future variables in the error and success handlers (or maybe it's just me that gets confused).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also renamed the other Futures to callbackFuture.

new ApiFutureCallback<Boolean>() {
@Override
public void onFailure(Throwable throwable) {
BulkWriterOperation.this.future.setException(throwable);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you rename the instance future variable, can you remove the need to use BulkWriterOperation.this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, cleaned up.

GrpcStatusCode.of(Status.Code.ABORTED),
true));

// @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't have BulkWriter mysteriously flaking everywhere again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Rule made debugging so difficult. I kept commenting it in and out. But the end state should be "commented in" :)

public class BulkWriterTest {

@Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
private static final ApiFuture<GeneratedMessageV3> FAILED_FUTURE =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very sleek cleanup.

import javax.annotation.Nullable;
import java.util.concurrent.Executor;

/** Used to represent a batch on the BatchQueue. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update class name?
My attempt: A batch that holds and schedules BulkWriter operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This should still match the original description. The pendingOperations are the operations that are scheduled on this batch. They might be scheduled on another batch too, but the scheduling itself doesn't take part here. Let me know if I am confused :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main issue I had was that "BatchQueue" is no longer a term used in BulkWriter.

boolean isReadyToSend() {
return state == BatchState.READY_TO_SEND;
void enqueueOperation(BulkWriterOperation operation) {
boolean added = documents.add(operation.getDocumentReference());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that optimization to add then check 🤯

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will save so much CPU that we can soon start mining Bitcoins in the SDK.

Copy link
Contributor Author

@schmidt-sebastian schmidt-sebastian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing me to fix the flakes. I think this uncovered an actual issue where the batch was sometimes modified by multiple threads (through the backoff handler as well as the user input).

import javax.annotation.Nullable;
import java.util.concurrent.Executor;

/** Used to represent a batch on the BatchQueue. */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. This should still match the original description. The pendingOperations are the operations that are scheduled on this batch. They might be scheduled on another batch too, but the scheduling itself doesn't take part here. Let me know if I am confused :)

boolean isReadyToSend() {
return state == BatchState.READY_TO_SEND;
void enqueueOperation(BulkWriterOperation operation) {
boolean added = documents.add(operation.getDocumentReference());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will save so much CPU that we can soon start mining Bitcoins in the SDK.

* Sends the current batch and resets {@link #bulkCommitBatch}.
*
* @param flush If provided, keeps re-sending operations until no more operations are enqueued.
* This allows retries to resolve as part of a {#link flush()} or {#link close()} call.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

} else {
bulkWriterExecutor.schedule(
new Runnable() {
public void sendOperation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

private final Executor errorExecutor;
private final BulkWriter.WriteErrorCallback errorListener;

private int failedAttempts;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find anything in the style guide, but it's a tad easier to read if we initialize here.

* Represents a single write for BulkWriter, encapsulating operation dispatch and error handling.
*/
class BulkWriterOperation {
private final SettableApiFuture<WriteResult> future = SettableApiFuture.create();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Also renamed the other Futures to callbackFuture.

new ApiFutureCallback<Boolean>() {
@Override
public void onFailure(Throwable throwable) {
BulkWriterOperation.this.future.setException(throwable);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, cleaned up.

GrpcStatusCode.of(Status.Code.ABORTED),
true));

// @Rule public Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Rule made debugging so difficult. I kept commenting it in and out. But the end state should be "commented in" :)

Copy link

@thebrianchen thebrianchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for diving into this one, really appreciate it :)

@schmidt-sebastian schmidt-sebastian added kokoro:force-run Add this label to force Kokoro to re-run the tests. automerge Merge the pull request once unit tests and other checks pass. labels Feb 5, 2021
@yoshi-kokoro yoshi-kokoro removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Feb 5, 2021
@gcf-merge-on-green gcf-merge-on-green bot merged commit 3ed79ae into master Feb 5, 2021
@gcf-merge-on-green gcf-merge-on-green bot deleted the mrschmidt/simplifyBulkWriter branch February 5, 2021 22:54
@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label Feb 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: firestore Issues related to the googleapis/java-firestore API. cla: yes This human has signed the Contributor License Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants