Skip to content

Conversation

@prawilny
Copy link
Collaborator

@prawilny prawilny commented Sep 14, 2021

Note that ExecutorServiceRule is copypasted from 1.x client. The reuse will have to wait for another PR.
I will squash the commits before the merge.

edit: squashed all the commits (instead of splitting them sensibly as there are some dependencies between partial changes) so that commits implementing suggestions from reviews are more clearly visible.


This change is Reviewable

Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 7 of 9 files at r1, all commit messages.
Reviewable status: 7 of 9 files reviewed, 11 unresolved discussions (waiting on @dopiera and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 27 at r1 (raw file):

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

As I commented elsewhere, please do not use wildcart imports. You can configure your IDE to replace them with specific usages when the code is formatted.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 43 at r1 (raw file):

  public MirroringAsyncConnection(
      Configuration conf, /* AsyncRegion */ Object o, String clusterId, User user)

Please add a comment explaining why are we taking Object instead of AsyncRegion and why we can safely ignore it here.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 73 at r1 (raw file):

  }

  private <T> T construct(String className, Object... params) {

Those method are copy-pasted from V1, can't we create a helper package in V1 with those methods and use them here and in V1 connection constructor? It'll be easier to move them to a separate base project later.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 102 at r1 (raw file):

  }

  // TODO: use default method after implementing MirroringAsyncTableBuilder

I've seen that it is common in the codebase to assign someone to every TODO, e.g. // TODO(aczajkowski): blah blah.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 39 at r1 (raw file):

public class MirroringAsyncTable<C extends ScanResultConsumerBase> implements AsyncTable<C> {
  AsyncTable<C> primaryTable;

private final?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 40 at r1 (raw file):

public class MirroringAsyncTable<C extends ScanResultConsumerBase> implements AsyncTable<C> {
  AsyncTable<C> primaryTable;
  AsyncTable<C> secondaryTable;

private final?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 41 at r1 (raw file):

  AsyncTable<C> primaryTable;
  AsyncTable<C> secondaryTable;
  VerificationContinuationFactory verificationContinuationFactory;

private final?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 90 at r1 (raw file):

        .exists(get)
        .handleAsync(
            (result, err) -> {

I think that you can easily extract ifology and handling an error into a (static?) method taking the future to be completed and callable to be run in non-error case.

this.primaryTable.exists(get).handleAsync(primaryOperationResultCallback(future, () -> { ... }));

This also applies to all other method in this file.

Moreover, maybe it would be possible to extract more common components from those methods. In fact it would be great if we could create a method that could be called like this:

primaryOperationResultCallbackForReads(
  (result) -> new RequestResourceDescription(result),
  () -> { return this.secondaryTable.exists(get) },
  (result) -> this.verificationContinuationFactory.exists(get, result)
)

and something similar for writes.

This would abstract away resolving the future, which is the most repetitive part of this code.

Also, maybe then it would be nice to abstract creating and returning the future away by creating yet another method taking primary future and some callables as parameters, and use it everywhere.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 90 at r1 (raw file):

        .exists(get)
        .handleAsync(
            (result, err) -> {

Nitpick: err -> error, I personally prefer to use full words in code (except int i in for loop and Exception e in catch block :D)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

package com.google.cloud.bigtable.mirroring.hbase2_x.utils.futures;

import static net.javacrumbs.futureconverter.java8guava.FutureConverter.*;

Google Java Style Guide forbids using wildcard imports ( https://google.github.io/styleguide/javaguide.html#s3.3-import-statements ). I wonder why wasn't this caught by the linter, did you run it (it runs when tests are performed)?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/org/apache/hadoop/hbase/client/TestRegistry.java, line 16 at r1 (raw file):

 * limitations under the License.
 */
package org.apache.hadoop.hbase.client;

Add a comment explaining why this class is in an unexpected package.

Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 7 of 9 files reviewed, 12 unresolved discussions (waiting on @dopiera and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 104 at r1 (raw file):

              return null;
            },
            executorService);

We can use MoreExecutors.directExecutor() here. It'll run the callback on the thread that completed the future. It will be consistent with other places in our codebase where an executor for callback is provided. Those callbacks are light-weight thus there shouldn't be any problem with using it.

Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 7 of 9 files reviewed, 13 unresolved discussions (waiting on @dopiera and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 94 at r1 (raw file):

                future.completeExceptionally(err);
              } else {
                scheduleVerificationAndRequestWithFlowControl(

This method is blocking


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 242 at r1 (raw file):

RequestScheduling.scheduleVerificationAndRequestWithFlowControl

This method is blocking, we should use something else.

@prawilny prawilny force-pushed the ac/MirroringAsyncConfiguration branch from 1e04353 to b315c2c Compare September 16, 2021 12:25
@prawilny
Copy link
Collaborator Author

Oh, didn't notice the review before.
Just force-pushed a version rebased onto new ac/AsyncMirroringConfiguration (which itself got rebased onto new ac/delaySecondaryRequest).

@prawilny
Copy link
Collaborator Author

Rebased once again in order to fix an error fixed in the previous branch.

Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 5 of 9 files reviewed, 13 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 27 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

As I commented elsewhere, please do not use wildcart imports. You can configure your IDE to replace them with specific usages when the code is formatted.

Oh, I did it in some automagical way (I feel like it was either Ctrl + Shift + L or mvn com.coveo:fmt-maven-plugin:format).
Edit: alt+enter did that, it turns out
Will fix it (already fixed the IDE configuration)!


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 43 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Please add a comment explaining why are we taking Object instead of AsyncRegion and why we can safely ignore it here.

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 73 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Those method are copy-pasted from V1, can't we create a helper package in V1 with those methods and use them here and in V1 connection constructor? It'll be easier to move them to a separate base project later.

I thought we wanted not to have arbitrary class creation with any kind of public interface.
Will do later.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 102 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I've seen that it is common in the codebase to assign someone to every TODO, e.g. // TODO(aczajkowski): blah blah.

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 39 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

private final?

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 40 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

private final?

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 41 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

private final?

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 90 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Nitpick: err -> error, I personally prefer to use full words in code (except int i in for loop and Exception e in catch block :D)

I wanted to check what Marek would say about playing gophers :)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 90 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I think that you can easily extract ifology and handling an error into a (static?) method taking the future to be completed and callable to be run in non-error case.

this.primaryTable.exists(get).handleAsync(primaryOperationResultCallback(future, () -> { ... }));

This also applies to all other method in this file.

Moreover, maybe it would be possible to extract more common components from those methods. In fact it would be great if we could create a method that could be called like this:

primaryOperationResultCallbackForReads(
  (result) -> new RequestResourceDescription(result),
  () -> { return this.secondaryTable.exists(get) },
  (result) -> this.verificationContinuationFactory.exists(get, result)
)

and something similar for writes.

This would abstract away resolving the future, which is the most repetitive part of this code.

Also, maybe then it would be nice to abstract creating and returning the future away by creating yet another method taking primary future and some callables as parameters, and use it everywhere.

Will do


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 94 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

This method is blocking

Yeah, only after rereading the code I realized we were waiting for the resources synchronously.
Will do.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 242 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
RequestScheduling.scheduleVerificationAndRequestWithFlowControl

This method is blocking, we should use something else.

Right, as said higher up I'll fix it


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Google Java Style Guide forbids using wildcard imports ( https://google.github.io/styleguide/javaguide.html#s3.3-import-statements ). I wonder why wasn't this caught by the linter, did you run it (it runs when tests are performed)?

I think it quite often complains on CI about me forgetting to run it so I have no idea why it wasn't caught.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/org/apache/hadoop/hbase/client/TestRegistry.java, line 16 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Add a comment explaining why this class is in an unexpected package.

Done.

@mwalkiewicz mwalkiewicz reopened this Sep 16, 2021
Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're still working on this (making flow control async), but I just made a first pass and left some comments.

Reviewable status: 1 of 9 files reviewed, 14 unresolved discussions (waiting on @mwalkiewicz and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 242 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Right, as said higher up I'll fix it

nit: "control flow" is about which statement gets executed next (i.e. loops, ifs, etc.). What you want here is "flow control", which is how we control in influx of incoming requests.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

I think it quite often complains on CI about me forgetting to run it so I have no idea why it wasn't caught.

I'm hesitant about using this library - it has 12 uses and it was last updated in 2018. Isn't implementing these converters trivial enough to just do it by ourselves?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/ExecutorServiceRule.java, line 25 at r4 (raw file):

import org.junit.rules.ExternalResource;

// Copypaste from hbase1_x

It's OK if you leave that for later, but I'm afraid we'll all forget - would you mind creating an issue for every instance of such a TODO and refer them in relevant places?

Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 1 of 9 files reviewed, 14 unresolved discussions (waiting on @dopiera and @mwalkiewicz)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 27 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Oh, I did it in some automagical way (I feel like it was either Ctrl + Shift + L or mvn com.coveo:fmt-maven-plugin:format).
Edit: alt+enter did that, it turns out
Will fix it (already fixed the IDE configuration)!

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 90 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Will do

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 94 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Yeah, only after rereading the code I realized we were waiting for the resources synchronously.
Will do.

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 104 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

We can use MoreExecutors.directExecutor() here. It'll run the callback on the thread that completed the future. It will be consistent with other places in our codebase where an executor for callback is provided. Those callbacks are light-weight thus there shouldn't be any problem with using it.

As discussed f2f, I decided to try going with handle(), thenAcceptBoth(), ... without "Async" suffix which seem to run on the thread calling complete() (and some other "completion method" - see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html).
Thus I just deleted this.executorService from MirroringAsyncTable for now.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 242 at r1 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

nit: "control flow" is about which statement gets executed next (i.e. loops, ifs, etc.). What you want here is "flow control", which is how we control in influx of incoming requests.

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

I'm hesitant about using this library - it has 12 uses and it was last updated in 2018. Isn't implementing these converters trivial enough to just do it by ourselves?

I was sure I read it in guava docs but I can''t find it now. Without that argument it really looks like we should rewrite this. I'll do it (in another PR).


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/ExecutorServiceRule.java, line 25 at r4 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

It's OK if you leave that for later, but I'm afraid we'll all forget - would you mind creating an issue for every instance of such a TODO and refer them in relevant places?

After the rewrite MirroringAsyncTable no longer has ExecutorService member (the continuations are run on the thread which completed the future - see the comments above), so the tests don't need this class, so I just removed it and an issue is no longer needed :)
Also, right now Mateusz is working on common package - maybe he will finish by the time MirroringAsyncTable starts needing this mock.
I'll create issues for such TODOs in future, too.

@prawilny prawilny force-pushed the ac/MirroringAsyncConfiguration branch from 0af7f65 to 195f8ca Compare September 20, 2021 13:20
Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r2, 1 of 7 files at r4, 2 of 4 files at r5, all commit messages.
Reviewable status: 5 of 9 files reviewed, 9 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 43 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Done.

I think that it should be said explicitly that it is not needed because we are using create async connection which will pass it to constructors of underlying connections.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 73 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

I thought we wanted not to have arbitrary class creation with any kind of public interface.
Will do later.

Later means in different PR or in different commit?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 131 at r5 (raw file):

  }

  private <T> CompletableFuture<T> getWithVerificationAndFlowControl(

nit: we have writeWithFlowControl, let's make this method readWithVerificationAndFlowControl.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 145 at r5 (raw file):

            RequestResourcesDescription resourcesDescription =
                resourcesDescriptionCreator.apply(primaryResult);
            CompletableFuture<FlowController.ResourceReservation> resourceFuture =

nit: resourceFuture -> resourceReservationFuture? I had hard time to understand that it is when reading the code.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 150 at r5 (raw file):

            reserveControlFlowResourcesThenScheduleSecondary(
                    resourceFuture,
                    primaryFuture,

We are passing this completed future to conform to the API that allows to make primary request and take resources reservation in parallel.
If we instead dropped support for taking the reservation in parallel we could make this method call more obvious (I initially thought that it is a bug that you pass an already completed future), and we wouldn't need to use acceptBoth that you said might have some caveats afair.

@dopiera do you think that acquiring resources in parallel to the primary operation is really necessary? It'll take either <1ms if resources are available or way longer than the primary operation if resources they are not, there seems to be almost no latency gain.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 187 at r5 (raw file):

  }

  private <T> CompletableFuture<T> reserveControlFlowResourcesThenScheduleSecondary(

ControlFlow -> FlowControl


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 205 at r5 (raw file):

        .exceptionally(
            t -> {
              if (!reservationFuture.cancel(true)) {

This is copied from V1, can be moved to some static function named cancelReservationFuture somewhere, maybe in ResourceReservation?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

I was sure I read it in guava docs but I can''t find it now. Without that argument it really looks like we should rewrite this. I'll do it (in another PR).

FYI: this lib is mentioned here https://guava.dev/releases/23.0/api/docs/com/google/common/util/concurrent/FluentFuture.html

@prawilny
Copy link
Collaborator Author

Rebased onto new ac/MirroringAsyncConfiguration cutting down on size of the patch by ca. 700 lines.
Sorry for forgetting that github tracks PRs to branches by name yesterday.

Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 11 files reviewed, 7 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 43 at r1 (raw file):

we are using create async connection which will pass it to constructors of underlying connections
Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 73 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Later means in different PR or in different commit?

Different PR (if despite your comment pointing to guava docs recommending this package from yesterday, we eventually decide to rewrite this)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 131 at r5 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

nit: we have writeWithFlowControl, let's make this method readWithVerificationAndFlowControl.

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 145 at r5 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

nit: resourceFuture -> resourceReservationFuture? I had hard time to understand that it is when reading the code.

Done


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 187 at r5 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

ControlFlow -> FlowControl

Done.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 205 at r5 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

This is copied from V1, can be moved to some static function named cancelReservationFuture somewhere, maybe in ResourceReservation?

Moved into FlowController's static function (it only uses java.util.concurrentFuture's interface so it handles both Listenable and Completable future families)

@prawilny
Copy link
Collaborator Author

prawilny commented Sep 21, 2021

Rebased again after adding a fix in MirroringAsyncConfig.
The verifications passes: https://github.com/Unoperate/java-bigtable-hbase-1/actions/runs/1257073168

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 3 of 11 files reviewed, 11 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/flowcontrol/FlowController.java, line 47 at r9 (raw file):

  }

  public static void releaseResourceWhenExceptionThrown(

Wouldn't cancelRequest be a better name?

In general, I think there's something fishy when the method needs to describe the context, where it should be used. If we added cancellation one day, calling this method would be perfectly OK.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/flowcontrol/FlowController.java, line 51 at r9 (raw file):

resourceReservationFuture.get().release();

Would you mind adding a comment explaining that cancellation might fail because the resources might have already been allocated by the time we called cancel(true)and in such a case, we should release them because the user will never have the chance.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java, line 68 at r9 (raw file):

  }

  public MirroringAsyncConnection(Configuration conf, ExecutorService pool, User user)

Please add a description of this constructor, especially the part about the pool, which will be created if a null is passed.


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 131 at r9 (raw file):

resourcesDescriptionCreator

This is always passed the same argument - RequestResourcesDescription::new. Do we really need this parameter?


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 141 at r9 (raw file):

} else {

In such situations I think it's better to return from the function in the first branch and not have the second branch at all - instead the statements from the else branch follow the single-branched if.

It's an acquired taste - I used to argue for the opposite in the past, but I now think the code does get more readable that way, not least because there is less indentation :)


bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/utils/futures/FutureConverter.java, line 19 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

FYI: this lib is mentioned here https://guava.dev/releases/23.0/api/docs/com/google/common/util/concurrent/FluentFuture.html

Please add a comment explaining that it's here temporarily.

@prawilny prawilny changed the base branch from ac/MirroringAsyncConfiguration to ac/ExtractBatchHelpersToUtils September 21, 2021 13:31
@prawilny
Copy link
Collaborator Author

Changed base base and rebased again, so that rebase will be easy for the batch() PR.

@prawilny
Copy link
Collaborator Author

Rebased

Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed (commit messages unreviewed), 1 unresolved discussion (waiting on @dopiera)

@prawilny prawilny merged commit a66fec7 into master Sep 27, 2021
@prawilny prawilny deleted the ac/v2-template-pr branch September 29, 2021 15:15
dopiera added a commit that referenced this pull request May 11, 2022
* Initial implementation of hbase1.x Mirroring Client (#1)

* Initial implementation of hbase1.x Mirroring Client

* MirroringTable and AsyncTableWrapper (#4)

* MirroringTable and AsyncTableWrapper

- Initial implementation of MirroringTable.
- Wrapper around HBase 1.x Table enabling scheduling asynchronous operations.
- Asynchronous verification of get/exists results.

* Add missing licence header to TestResultComparator (#6)

* Setup running tests in GitHub Actions (#7)

* Asynchronous ResultScanner (#5)

* Asynchronous ResultScanner

* Implement faillog.Appender (#2)

`faillog.Appender` is the lowermost component of the infrastructure for
dumping failed mutations in the `MirroringClient`. `faillog/README.md`
explains the design decisions in a bit more detail.

`faillog.Appender` essentially logs arbitrary data asynchronously,
through a separate thread reading from a bounded buffer.

* ListenableReferenceCounter (#8)

* Count references in asynchronous tasks before closing Table/Scanner.

* Flow controller (#10)

* Flow Controller
* Flow Control strategy based on single requests queue

* Mirroring table: writes (#12)

* Add missing condition in result comparator (#15)

* Add more mirroring config opitons (#16)

* MismatchDetector implementation.
* FlowControllerStrategy implementation.
* Maximal number of outstanding requests used by FlowControllerStrategy.
* Primary/Secondary connection implementation option now accepts
  "default" value which can be used when default HBase Connection
  implementation should be used by MirroringConnection.

* Make AsyncTableWrapper ListenableCloseable (#20)

Implements our standard interface for objects that can run callbacks
after asynchronous close to simplify reference counting of
MirroringTable.

* MirroringConnection: use new config options (#17)

* MirroringResultScanner improvements (#18)

* Count references to MirroringResultScanner

Current implementation of MirroringResultScanner doesn't count
verificaiton requests that it has scheduled and allows to close
instances while verificaiton requests are in-flight. This causes lost
verifications.

This PR fixes this issue by counting references to MirroringResultScanner
instances when scheduling verification requests. Moreover, ListenableClosable
interface is implemented for consistency with other classes that use this
scheme, because now the MirroringResultScanner instances will be closed
asynchronously, when all scheduled requests have finished.

* MirroringTable: count references to scanners and secondary wrapper (#21)

Current MirroringTable implementaion does not count its references held
by MirroringResultScanners and SecondaryAsyncWrapper, thus
MirroringConnection consideres it closed before we are sure that all
asynchronous operations have completed.

This PR adds correct reference counting of MirroringTable based on work
done in previously merged PRs.

* Result Scanner - ensure verification ordering (#22)

Current implementation assumes that next() operations on primary and
secondary scanners are called in the same order and uses this assumption
to match results for verification.

However, next()s on secondary database are called asynchronusly and
their order is not defined, which causes invalid mismatch reports.

This PR fixes this problem by placing data to be verified - results of
next()s called on primary scanner - and details of next() call - number
of requested elements - call on a queue. Each asynchronous call to
next() is synchonized and pops a single element from that queue.
Appropriate next is called based on number of requested elements.
Then results of that request and results from the queue are verified.
This ensures that results of next()s passed to verification are
correctly matched and ordered.

* Integration tests (#23)

HBase 1.x integration tests

* Add trace logging. (#24)

* Estimate memory overhead in RequestResourceDescription (#25)

* Tests: extract executor service to TestRule (#26)

Extract executor service utilities into TestRule to facilitate code
reuse in other test classes.

* Integration tests: read configuration from xml files

* MirroringBufferedMutator

* MirroringBufferedMutator: integration tests (#9)

* Fix error introduced in rebase (#11)

* Obtain resource reservation before scheduling secondary calls (#4)

Fixes a bug when secondary database request Future was created before obtaining resources from FlowController.

* Integration tests - MirroringTable operations (#10)

* MirroringAsyncConfiguration (#5)

Add configuration class to be used by MirroringAsyncConnection.

* SecondaryWriteErrorConsumer in MirroringTable (#15)

Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes.

* Use Put to implement Increment and Append (#16)

* refactor: extract functions using reflection into package utils.reflection

* refactor: extract BatchHelpers into utils

Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client.

* feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client

Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable.

* refactor: extract FlowController's request cancellation into a method

* fix: Increment ITs fail with Bigtable as primary (#21)

We were setting timerange on Increment objects used in integration tests
without any reason and Bigtable doesn't support this operation. Setting
timerange in ITs was removed.

* fix: RequestScheduling should handle rejected resource reservations (#24)

Custom FlowControlerStrategy implementations might, contrary to the
default implementation, resolve reservation requests with exception,
what we should handle by not performing the action that had to acquire
the resources.

* feat: Add OpenCensus tracing and metrics. (#14)

* fix: make BatchHelpers skip verification of empty read results

BatchHelpers provides error handling of batch() when there may be some partial results.
Before the commit, matching successful reads were redundantly verified if there were none of them.
This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays.

* refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics

BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API.

* refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation

This change is split off from commit introducing MirroringAsyncTable#batch()

* feat: implement batch() in MirroringAsyncTable

Implementation of MirroringAsyncTable's batch() and
MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it.

* feat: implement failed mutations log (#19)

Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually.

Each failure is logged as a separate line, which makes it compatible with solutions like logstash.

* refactor: split SplitBatchResponse (#40)

SplitBatchResponse was refactored into two parts: splitting into
reads/writes and failed/successful. This makes the code simpler and
easier to maintain.

* refactor: extract helper methods in tests (#48)

* refactor: remove redundant writeWithControlFlow argument

* feat: copy HBase operations' input lists (#57)

* refactor: remove redundant field from MirroringConnection (#55)

* feat: verification with sampling (#28)

* fix: mirror Increment/Append in batch() using Put. (#47)

* refactor: Move HBaseOperation into WriteOperationInfo (#68)

* refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69)

* fix: integration tests - fix build (#70)

* fix: count references to batch operations (#63)

* fix: close underlying connections when MirroringConnection is closed (#49)

* refactor: fix IDE warnings in MirroringAsyncTable test (#64)

* fix: integration tests - check if write errors were reported (#71)

* feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65)

* fix: do not call callbacks with lock held (#53)

* refactor: use AccumulatedExceptions where appropriate (#54)

* fix: fix key used in verification sampling ITs (#77)

* feat: use faillog for handling write errors (#66)

* refactor: add utilities for Futures (FutureUtils)

* feat: defer closing connections until async operations complete (#37)

Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations.

* feat: implement AsyncTableBuilder (#42)

* feat: implement MirroringAsyncTable#checkAndMutate (#43)

* fix: Implement single Append and Increment as Put (#38)

* refactor: simplify SecondaryWriteErrorConsumer API (#78)

* feat: concurrent writes in MirroringTable (#79)

* test: fix failing concurrent write test (#120)

* refactor: renames and moves in RequestScheduling (#87)

* wip: handover session comments

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
mwalkiewicz added a commit that referenced this pull request May 18, 2022
* Initial implementation of hbase1.x Mirroring Client (#1)

* Initial implementation of hbase1.x Mirroring Client

* MirroringTable and AsyncTableWrapper (#4)

* MirroringTable and AsyncTableWrapper

- Initial implementation of MirroringTable.
- Wrapper around HBase 1.x Table enabling scheduling asynchronous operations.
- Asynchronous verification of get/exists results.

* Add missing licence header to TestResultComparator (#6)

* Setup running tests in GitHub Actions (#7)

* Asynchronous ResultScanner (#5)

* Asynchronous ResultScanner

* Implement faillog.Appender (#2)

`faillog.Appender` is the lowermost component of the infrastructure for
dumping failed mutations in the `MirroringClient`. `faillog/README.md`
explains the design decisions in a bit more detail.

`faillog.Appender` essentially logs arbitrary data asynchronously,
through a separate thread reading from a bounded buffer.

* ListenableReferenceCounter (#8)

* Count references in asynchronous tasks before closing Table/Scanner.

* Flow controller (#10)

* Flow Controller
* Flow Control strategy based on single requests queue

* Mirroring table: writes (#12)

* Add missing condition in result comparator (#15)

* Add more mirroring config opitons (#16)

* MismatchDetector implementation.
* FlowControllerStrategy implementation.
* Maximal number of outstanding requests used by FlowControllerStrategy.
* Primary/Secondary connection implementation option now accepts
  "default" value which can be used when default HBase Connection
  implementation should be used by MirroringConnection.

* Make AsyncTableWrapper ListenableCloseable (#20)

Implements our standard interface for objects that can run callbacks
after asynchronous close to simplify reference counting of
MirroringTable.

* MirroringConnection: use new config options (#17)

* MirroringResultScanner improvements (#18)

* Count references to MirroringResultScanner

Current implementation of MirroringResultScanner doesn't count
verificaiton requests that it has scheduled and allows to close
instances while verificaiton requests are in-flight. This causes lost
verifications.

This PR fixes this issue by counting references to MirroringResultScanner
instances when scheduling verification requests. Moreover, ListenableClosable
interface is implemented for consistency with other classes that use this
scheme, because now the MirroringResultScanner instances will be closed
asynchronously, when all scheduled requests have finished.

* MirroringTable: count references to scanners and secondary wrapper (#21)

Current MirroringTable implementaion does not count its references held
by MirroringResultScanners and SecondaryAsyncWrapper, thus
MirroringConnection consideres it closed before we are sure that all
asynchronous operations have completed.

This PR adds correct reference counting of MirroringTable based on work
done in previously merged PRs.

* Result Scanner - ensure verification ordering (#22)

Current implementation assumes that next() operations on primary and
secondary scanners are called in the same order and uses this assumption
to match results for verification.

However, next()s on secondary database are called asynchronusly and
their order is not defined, which causes invalid mismatch reports.

This PR fixes this problem by placing data to be verified - results of
next()s called on primary scanner - and details of next() call - number
of requested elements - call on a queue. Each asynchronous call to
next() is synchonized and pops a single element from that queue.
Appropriate next is called based on number of requested elements.
Then results of that request and results from the queue are verified.
This ensures that results of next()s passed to verification are
correctly matched and ordered.

* Integration tests (#23)

HBase 1.x integration tests

* Add trace logging. (#24)

* Estimate memory overhead in RequestResourceDescription (#25)

* Tests: extract executor service to TestRule (#26)

Extract executor service utilities into TestRule to facilitate code
reuse in other test classes.

* Integration tests: read configuration from xml files

* MirroringBufferedMutator

* MirroringBufferedMutator: integration tests (#9)

* Fix error introduced in rebase (#11)

* Obtain resource reservation before scheduling secondary calls (#4)

Fixes a bug when secondary database request Future was created before obtaining resources from FlowController.

* Integration tests - MirroringTable operations (#10)

* MirroringAsyncConfiguration (#5)

Add configuration class to be used by MirroringAsyncConnection.

* SecondaryWriteErrorConsumer in MirroringTable (#15)

Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes.

* Use Put to implement Increment and Append (#16)

* refactor: extract functions using reflection into package utils.reflection

* refactor: extract BatchHelpers into utils

Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client.

* feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client

Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable.

* refactor: extract FlowController's request cancellation into a method

* fix: Increment ITs fail with Bigtable as primary (#21)

We were setting timerange on Increment objects used in integration tests
without any reason and Bigtable doesn't support this operation. Setting
timerange in ITs was removed.

* fix: RequestScheduling should handle rejected resource reservations (#24)

Custom FlowControlerStrategy implementations might, contrary to the
default implementation, resolve reservation requests with exception,
what we should handle by not performing the action that had to acquire
the resources.

* feat: Add OpenCensus tracing and metrics. (#14)

* fix: make BatchHelpers skip verification of empty read results

BatchHelpers provides error handling of batch() when there may be some partial results.
Before the commit, matching successful reads were redundantly verified if there were none of them.
This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays.

* refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics

BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API.

* refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation

This change is split off from commit introducing MirroringAsyncTable#batch()

* feat: implement batch() in MirroringAsyncTable

Implementation of MirroringAsyncTable's batch() and
MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it.

* feat: implement failed mutations log (#19)

Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually.

Each failure is logged as a separate line, which makes it compatible with solutions like logstash.

* refactor: split SplitBatchResponse (#40)

SplitBatchResponse was refactored into two parts: splitting into
reads/writes and failed/successful. This makes the code simpler and
easier to maintain.

* refactor: extract helper methods in tests (#48)

* refactor: remove redundant writeWithControlFlow argument

* feat: copy HBase operations' input lists (#57)

* refactor: remove redundant field from MirroringConnection (#55)

* feat: verification with sampling (#28)

* fix: mirror Increment/Append in batch() using Put. (#47)

* refactor: Move HBaseOperation into WriteOperationInfo (#68)

* refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69)

* fix: integration tests - fix build (#70)

* fix: count references to batch operations (#63)

* fix: close underlying connections when MirroringConnection is closed (#49)

* refactor: fix IDE warnings in MirroringAsyncTable test (#64)

* fix: integration tests - check if write errors were reported (#71)

* feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65)

* fix: do not call callbacks with lock held (#53)

* refactor: use AccumulatedExceptions where appropriate (#54)

* fix: fix key used in verification sampling ITs (#77)

* feat: use faillog for handling write errors (#66)

* refactor: add utilities for Futures (FutureUtils)

* feat: defer closing connections until async operations complete (#37)

Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations.

* feat: implement AsyncTableBuilder (#42)

* feat: implement MirroringAsyncTable#checkAndMutate (#43)

* fix: Implement single Append and Increment as Put (#38)

* refactor: simplify SecondaryWriteErrorConsumer API (#78)

* feat: concurrent writes in MirroringTable (#79)

* test: fix failing concurrent write test (#120)

* refactor: renames and moves in RequestScheduling (#87)

* wip: handover session comments

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants