Skip to content

Conversation

@mwalkiewicz
Copy link

@mwalkiewicz mwalkiewicz commented Oct 14, 2021

This change is Reviewable

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-table/1 branch 3 times, most recently from e426956 to 6950f05 Compare October 15, 2021 10:58
Copy link
Collaborator

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I started reading the code I felt like it was a bit unsould/unnatural. After I read it, my opinion changed for the better. Especially concurrentBatch() looks quite nice in my eyes (apart from using Exception[] as AtomicReference).
I still feel like it could get better, though. But the ideas I can think of how to do it seem to be a bit wishful (dropping java 7 and using lambdas, the Future approach discussed on the standup).

The status I use is "comment" rather than "request changes" in order not to block the merge on days I'm not in the office.

Reviewed 11 of 11 files at r1.
Reviewable status: 9 of 11 files reviewed, 9 unresolved discussions (waiting on @mwalkiewicz and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 514 at r1 (raw file):

  @Override
  public void mutateRow(final RowMutations rowMutations) throws IOException {

Not done concurrently.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 793 at r1 (raw file):

      batchWithSpan(Collections.singletonList(operation), results);
    } catch (RetriesExhaustedWithDetailsException e) {
      if (e.getCause(0) instanceof IOException) {

nitpick: I'd create an intermediate variable here.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 896 at r1 (raw file):

          public ListenableFuture<Void> get() {
            // We are scheduling secondary batch to run concurrently.
            ListenableFuture<Void> secondaryOperationEnded =

There's a stupid thing we cannot fix: get() is used to receive an object from the Supplier. get() is also used to synchronously wait for Future. I nearly got burnt - had to double check that you didn't accidentally make the calls sequential.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java, line 77 at r1 (raw file):

        reservation = reservationRequest.get();
      }
    } catch (InterruptedException | ExecutionException e) {

IMO it's not clearer (but maybe I'm just used to the old place of this chunk of code).


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestHelpers.java, line 227 at r1 (raw file):

  }

  public static <T> ArrayList<T> asArrayList(T... entries) {

Nit: it's unused and you can fit it into one line (I think): return new ArrayList<T>(Arrays.asList(entries))


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 752 at r1 (raw file):

      mirroringTable.batch(requests, results);
      fail("should have thrown");
    } catch (IOException ignored) {

nit: I like asserting that I catch the exception I expect (rather than something wrapped somehow).


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1274 at r1 (raw file):

        Arrays.asList(put1, put2, put3, put4, delete1, delete2, delete3, delete4);

    //           |  p1  |  p2  |  p3  |  p4  |  d1  |  d2  |  d3  |  d4

It's a bit unclear to me how primary or secondary can have 'x' in Put test when there is no 'x' in respective error. In other words: why is "{p,s}. error" needed?

Also, I don't understand where 'y' in secondary comes from.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1332 at r1 (raw file):

    } catch (IOException ignored) {
    }

nit: I'd check (assertThat().isEqualTo()) one or two results to make sure that it works as expected.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1362 at r2 (raw file):

              oneWaited.set(true);
            }
            bothRun.get(3, TimeUnit.SECONDS);

Nit: I'd add a single line comment explaining that this get() blocks the first thread. It took me a while to understand what's going on.

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-table/1 branch from 6950f05 to bad5ff6 Compare October 15, 2021 11:45
Copy link
Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 8 of 11 files reviewed, 5 unresolved discussions (waiting on @mwalkiewicz and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 514 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Not done concurrently.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 793 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

nitpick: I'd create an intermediate variable here.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java, line 77 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

IMO it's not clearer (but maybe I'm just used to the old place of this chunk of code).

Exceptions that are caught in this handler are not thrown by Futures.addCallback and I had to think about whether is is the case or not. Thus I've decided to make it obvious that those exceptions can only be thrown when obtaining a reservation.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 752 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

nit: I like asserting that I catch the exception I expect (rather than something wrapped somehow).

And I expect IOException here :D Unfortunately exceptions thrown by mocked batch are not the same as ones that are passed to mockBatch method and I do not think that it is that important.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1274 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

It's a bit unclear to me how primary or secondary can have 'x' in Put test when there is no 'x' in respective error. In other words: why is "{p,s}. error" needed?

Also, I don't understand where 'y' in secondary comes from.

'y' -> typo, should be 'x'

Yup, you are right, lower part of the table wasn't clear. I've replaced it with verbose comment explaining expected behavior.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1332 at r1 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

nit: I'd check (assertThat().isEqualTo()) one or two results to make sure that it works as expected.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1362 at r2 (raw file):

Previously, prawilny (Adam Czajkowski) wrote…

Nit: I'd add a single line comment explaining that this get() blocks the first thread. It took me a while to understand what's going on.

Done.

Copy link
Collaborator

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 3 files at r3, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @mwalkiewicz)

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @mwalkiewicz)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 114 at r3 (raw file):

  private final ReadSampler readSampler;
  private final boolean performWritesConcurrently;

Perhaps this is not the right place for this comment, but I think that we should start holding options rather than having so many parameters.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 785 at r3 (raw file):

        throw (IOException) exception;
      }
      throw new IOException(exception);

Shall we log an unexpected exception type here?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 793 at r3 (raw file):

BatchOperation

I think it would be much simpler if you simply passed a @nullable callback and call one of the relevant versions in batchWithSpan.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 820 at r3 (raw file):

    Log.trace("[%s] batch(operations=%s, results)", this.getName(), operations);

    final Object[] internalPrimaryResults = new Object[results.length];

I understand why we need this extra variable and the copy, but could you please add a comment explaining this?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 842 at r3 (raw file):

  }

  private boolean canBatchBePerformedConcurrently(List<? extends Row> operations) {

Reads shouldn't really be a problem. I'm guessing the problem with reads is that it's tricky for us to schedule verification without knowing the results from the primary.

In an ideal world (with an async client), we could split the batch into a "sync" and "async" batch and schedule those in parallel, but we don't have that luxury in HBase 1.x. In the real world, we could still plug read verification here. Here's how:

  • on every operation, create a CompletableFuture<Result> primaryOpCompleted
  • pass that primaryOpCompleted all the way down to scheduleVerificationAndRequestWithFlowControl()
  • instead of scheduling the verification as a continuation of the secondary database's operation, schedule it as a continuation of all_of() on both futures
  • in HBase 1.x simply satisfy the primary operation right after the primary operation completes

WDYT?

If this is doable and you decide to do it, please make it in a separate PR and base this on top of that one.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 868 at r3 (raw file):

  }

  private void concurrentBatch(

In the sequential version we used to:

  • perform the primary operation
  • grab a reservation from the flow controller
  • kick off the secondary operation
  • return

Here, we are doing it in the opposite order:

  • grab a reservation from the flow controller
  • kick off the secondary operation
  • perform the primary operation
  • return

I doubt it really matters, but in order to properly resemble what is going on in the sequential version we should:

  • kick off getting the reservation from the flow controller
  • register the secondary operation as a continuation to getting the reservation
  • perform the primary operation
  • wait until the flow control grants the reservation
  • return

With all that said, I'm not sure it's worth the effort. It seems that we'd need the secondary DB to just barely keep up to make a difference. WDYT?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 901 at r3 (raw file):

createBatchVerificationCallback(

This is now with the assumption that we're not making any read operations here. If we did, I'm guessing we'd have to use createBatchVerificationCallback(), right? Would it be hard to plug it in here?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java, line 123 at r3 (raw file):

  /**
   * When set to {@code true} writes to primary and secondary databases will be performed

Please indicate what the default is (i.e. false).


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java, line 61 at r3 (raw file):

  }

  public static <T> ListenableFuture<Void> scheduleRequestAndVerificationWithFlowControl(

Why do we need this change? From what I gather, you did some renames (for the better - I agree) and moved Futures.addCallback() out of try catch, which is a no-op because it doesn't throw anything.

As much as I like the renames, I think this should go to a different PR if it's not needed.

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here's a crazy idea - would it make things easier, if we used the primary table through AsyncTableWrapper with a DirectExecutor? Here's what I have in mind: every event returns a Supplier<Future> so depending on whether it's sequential/parallel sync/async we can define the order and control flow between those operations in abstraction of what they are. WDYT?

Reviewable status: all files reviewed, 10 unresolved discussions (waiting on @mwalkiewicz)

Copy link
Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 9 unresolved discussions (waiting on @dopiera and @mwalkiewicz)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 114 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

Perhaps this is not the right place for this comment, but I think that we should start holding options rather than having so many parameters.

Agreed, I've created an issue.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 793 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…
BatchOperation

I think it would be much simpler if you simply passed a @nullable callback and call one of the relevant versions in batchWithSpan.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 820 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

I understand why we need this extra variable and the copy, but could you please add a comment explaining this?

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 901 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…
createBatchVerificationCallback(

This is now with the assumption that we're not making any read operations here. If we did, I'm guessing we'd have to use createBatchVerificationCallback(), right? Would it be hard to plug it in here?

createBatchVerificationCallback uses the fact


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java, line 123 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

Please indicate what the default is (i.e. false).

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java, line 61 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

Why do we need this change? From what I gather, you did some renames (for the better - I agree) and moved Futures.addCallback() out of try catch, which is a no-op because it doesn't throw anything.

As much as I like the renames, I think this should go to a different PR if it's not needed.

I've moved rename of this function and moving Futures.addCallback() to a separate PR. I'm leaving rename in MirroringTable in this PR (scheduleWriteWithControlFlow -> scheduleSequentialWriteOperation) because it is related to this PR.

@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-table/1 branch from bad5ff6 to 5fcff49 Compare October 19, 2021 09:32
@mwalkiewicz mwalkiewicz force-pushed the mw/concurrent-writes-table/1 branch from 5fcff49 to 50b7d22 Compare October 19, 2021 09:37
Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, except you didn't respond to one of my comments and I think you rebased incorrectly (lots of unrelated changes).

Reviewable status: 0 of 25 files reviewed, 5 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)

@mwalkiewicz mwalkiewicz changed the base branch from mw/secondary-write-error-consumer-api-simplification/1 to master October 20, 2021 12:36
Copy link
Author

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 25 files reviewed, 5 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java, line 785 at r3 (raw file):

Previously, dopiera (Marek Dopiera) wrote…

Shall we log an unexpected exception type here?

I'm not really sure that other exception types are unexpected, getCause return type doesn't prevent them.

Copy link

@dopiera dopiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 25 files reviewed, all discussions resolved (waiting on @dopiera and @prawilny)

@mwalkiewicz mwalkiewicz merged commit 5360c2e into master Oct 22, 2021
dopiera pushed a commit that referenced this pull request May 11, 2022
dopiera added a commit that referenced this pull request May 11, 2022
* Initial implementation of hbase1.x Mirroring Client (#1)

* Initial implementation of hbase1.x Mirroring Client

* MirroringTable and AsyncTableWrapper (#4)

* MirroringTable and AsyncTableWrapper

- Initial implementation of MirroringTable.
- Wrapper around HBase 1.x Table enabling scheduling asynchronous operations.
- Asynchronous verification of get/exists results.

* Add missing licence header to TestResultComparator (#6)

* Setup running tests in GitHub Actions (#7)

* Asynchronous ResultScanner (#5)

* Asynchronous ResultScanner

* Implement faillog.Appender (#2)

`faillog.Appender` is the lowermost component of the infrastructure for
dumping failed mutations in the `MirroringClient`. `faillog/README.md`
explains the design decisions in a bit more detail.

`faillog.Appender` essentially logs arbitrary data asynchronously,
through a separate thread reading from a bounded buffer.

* ListenableReferenceCounter (#8)

* Count references in asynchronous tasks before closing Table/Scanner.

* Flow controller (#10)

* Flow Controller
* Flow Control strategy based on single requests queue

* Mirroring table: writes (#12)

* Add missing condition in result comparator (#15)

* Add more mirroring config opitons (#16)

* MismatchDetector implementation.
* FlowControllerStrategy implementation.
* Maximal number of outstanding requests used by FlowControllerStrategy.
* Primary/Secondary connection implementation option now accepts
  "default" value which can be used when default HBase Connection
  implementation should be used by MirroringConnection.

* Make AsyncTableWrapper ListenableCloseable (#20)

Implements our standard interface for objects that can run callbacks
after asynchronous close to simplify reference counting of
MirroringTable.

* MirroringConnection: use new config options (#17)

* MirroringResultScanner improvements (#18)

* Count references to MirroringResultScanner

Current implementation of MirroringResultScanner doesn't count
verificaiton requests that it has scheduled and allows to close
instances while verificaiton requests are in-flight. This causes lost
verifications.

This PR fixes this issue by counting references to MirroringResultScanner
instances when scheduling verification requests. Moreover, ListenableClosable
interface is implemented for consistency with other classes that use this
scheme, because now the MirroringResultScanner instances will be closed
asynchronously, when all scheduled requests have finished.

* MirroringTable: count references to scanners and secondary wrapper (#21)

Current MirroringTable implementaion does not count its references held
by MirroringResultScanners and SecondaryAsyncWrapper, thus
MirroringConnection consideres it closed before we are sure that all
asynchronous operations have completed.

This PR adds correct reference counting of MirroringTable based on work
done in previously merged PRs.

* Result Scanner - ensure verification ordering (#22)

Current implementation assumes that next() operations on primary and
secondary scanners are called in the same order and uses this assumption
to match results for verification.

However, next()s on secondary database are called asynchronusly and
their order is not defined, which causes invalid mismatch reports.

This PR fixes this problem by placing data to be verified - results of
next()s called on primary scanner - and details of next() call - number
of requested elements - call on a queue. Each asynchronous call to
next() is synchonized and pops a single element from that queue.
Appropriate next is called based on number of requested elements.
Then results of that request and results from the queue are verified.
This ensures that results of next()s passed to verification are
correctly matched and ordered.

* Integration tests (#23)

HBase 1.x integration tests

* Add trace logging. (#24)

* Estimate memory overhead in RequestResourceDescription (#25)

* Tests: extract executor service to TestRule (#26)

Extract executor service utilities into TestRule to facilitate code
reuse in other test classes.

* Integration tests: read configuration from xml files

* MirroringBufferedMutator

* MirroringBufferedMutator: integration tests (#9)

* Fix error introduced in rebase (#11)

* Obtain resource reservation before scheduling secondary calls (#4)

Fixes a bug when secondary database request Future was created before obtaining resources from FlowController.

* Integration tests - MirroringTable operations (#10)

* MirroringAsyncConfiguration (#5)

Add configuration class to be used by MirroringAsyncConnection.

* SecondaryWriteErrorConsumer in MirroringTable (#15)

Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes.

* Use Put to implement Increment and Append (#16)

* refactor: extract functions using reflection into package utils.reflection

* refactor: extract BatchHelpers into utils

Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client.

* feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client

Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable.

* refactor: extract FlowController's request cancellation into a method

* fix: Increment ITs fail with Bigtable as primary (#21)

We were setting timerange on Increment objects used in integration tests
without any reason and Bigtable doesn't support this operation. Setting
timerange in ITs was removed.

* fix: RequestScheduling should handle rejected resource reservations (#24)

Custom FlowControlerStrategy implementations might, contrary to the
default implementation, resolve reservation requests with exception,
what we should handle by not performing the action that had to acquire
the resources.

* feat: Add OpenCensus tracing and metrics. (#14)

* fix: make BatchHelpers skip verification of empty read results

BatchHelpers provides error handling of batch() when there may be some partial results.
Before the commit, matching successful reads were redundantly verified if there were none of them.
This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays.

* refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics

BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API.

* refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation

This change is split off from commit introducing MirroringAsyncTable#batch()

* feat: implement batch() in MirroringAsyncTable

Implementation of MirroringAsyncTable's batch() and
MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it.

* feat: implement failed mutations log (#19)

Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually.

Each failure is logged as a separate line, which makes it compatible with solutions like logstash.

* refactor: split SplitBatchResponse (#40)

SplitBatchResponse was refactored into two parts: splitting into
reads/writes and failed/successful. This makes the code simpler and
easier to maintain.

* refactor: extract helper methods in tests (#48)

* refactor: remove redundant writeWithControlFlow argument

* feat: copy HBase operations' input lists (#57)

* refactor: remove redundant field from MirroringConnection (#55)

* feat: verification with sampling (#28)

* fix: mirror Increment/Append in batch() using Put. (#47)

* refactor: Move HBaseOperation into WriteOperationInfo (#68)

* refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69)

* fix: integration tests - fix build (#70)

* fix: count references to batch operations (#63)

* fix: close underlying connections when MirroringConnection is closed (#49)

* refactor: fix IDE warnings in MirroringAsyncTable test (#64)

* fix: integration tests - check if write errors were reported (#71)

* feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65)

* fix: do not call callbacks with lock held (#53)

* refactor: use AccumulatedExceptions where appropriate (#54)

* fix: fix key used in verification sampling ITs (#77)

* feat: use faillog for handling write errors (#66)

* refactor: add utilities for Futures (FutureUtils)

* feat: defer closing connections until async operations complete (#37)

Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations.

* feat: implement AsyncTableBuilder (#42)

* feat: implement MirroringAsyncTable#checkAndMutate (#43)

* fix: Implement single Append and Increment as Put (#38)

* refactor: simplify SecondaryWriteErrorConsumer API (#78)

* feat: concurrent writes in MirroringTable (#79)

* test: fix failing concurrent write test (#120)

* refactor: renames and moves in RequestScheduling (#87)

* wip: handover session comments

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
mwalkiewicz added a commit that referenced this pull request May 18, 2022
* Initial implementation of hbase1.x Mirroring Client (#1)

* Initial implementation of hbase1.x Mirroring Client

* MirroringTable and AsyncTableWrapper (#4)

* MirroringTable and AsyncTableWrapper

- Initial implementation of MirroringTable.
- Wrapper around HBase 1.x Table enabling scheduling asynchronous operations.
- Asynchronous verification of get/exists results.

* Add missing licence header to TestResultComparator (#6)

* Setup running tests in GitHub Actions (#7)

* Asynchronous ResultScanner (#5)

* Asynchronous ResultScanner

* Implement faillog.Appender (#2)

`faillog.Appender` is the lowermost component of the infrastructure for
dumping failed mutations in the `MirroringClient`. `faillog/README.md`
explains the design decisions in a bit more detail.

`faillog.Appender` essentially logs arbitrary data asynchronously,
through a separate thread reading from a bounded buffer.

* ListenableReferenceCounter (#8)

* Count references in asynchronous tasks before closing Table/Scanner.

* Flow controller (#10)

* Flow Controller
* Flow Control strategy based on single requests queue

* Mirroring table: writes (#12)

* Add missing condition in result comparator (#15)

* Add more mirroring config opitons (#16)

* MismatchDetector implementation.
* FlowControllerStrategy implementation.
* Maximal number of outstanding requests used by FlowControllerStrategy.
* Primary/Secondary connection implementation option now accepts
  "default" value which can be used when default HBase Connection
  implementation should be used by MirroringConnection.

* Make AsyncTableWrapper ListenableCloseable (#20)

Implements our standard interface for objects that can run callbacks
after asynchronous close to simplify reference counting of
MirroringTable.

* MirroringConnection: use new config options (#17)

* MirroringResultScanner improvements (#18)

* Count references to MirroringResultScanner

Current implementation of MirroringResultScanner doesn't count
verificaiton requests that it has scheduled and allows to close
instances while verificaiton requests are in-flight. This causes lost
verifications.

This PR fixes this issue by counting references to MirroringResultScanner
instances when scheduling verification requests. Moreover, ListenableClosable
interface is implemented for consistency with other classes that use this
scheme, because now the MirroringResultScanner instances will be closed
asynchronously, when all scheduled requests have finished.

* MirroringTable: count references to scanners and secondary wrapper (#21)

Current MirroringTable implementaion does not count its references held
by MirroringResultScanners and SecondaryAsyncWrapper, thus
MirroringConnection consideres it closed before we are sure that all
asynchronous operations have completed.

This PR adds correct reference counting of MirroringTable based on work
done in previously merged PRs.

* Result Scanner - ensure verification ordering (#22)

Current implementation assumes that next() operations on primary and
secondary scanners are called in the same order and uses this assumption
to match results for verification.

However, next()s on secondary database are called asynchronusly and
their order is not defined, which causes invalid mismatch reports.

This PR fixes this problem by placing data to be verified - results of
next()s called on primary scanner - and details of next() call - number
of requested elements - call on a queue. Each asynchronous call to
next() is synchonized and pops a single element from that queue.
Appropriate next is called based on number of requested elements.
Then results of that request and results from the queue are verified.
This ensures that results of next()s passed to verification are
correctly matched and ordered.

* Integration tests (#23)

HBase 1.x integration tests

* Add trace logging. (#24)

* Estimate memory overhead in RequestResourceDescription (#25)

* Tests: extract executor service to TestRule (#26)

Extract executor service utilities into TestRule to facilitate code
reuse in other test classes.

* Integration tests: read configuration from xml files

* MirroringBufferedMutator

* MirroringBufferedMutator: integration tests (#9)

* Fix error introduced in rebase (#11)

* Obtain resource reservation before scheduling secondary calls (#4)

Fixes a bug when secondary database request Future was created before obtaining resources from FlowController.

* Integration tests - MirroringTable operations (#10)

* MirroringAsyncConfiguration (#5)

Add configuration class to be used by MirroringAsyncConnection.

* SecondaryWriteErrorConsumer in MirroringTable (#15)

Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes.

* Use Put to implement Increment and Append (#16)

* refactor: extract functions using reflection into package utils.reflection

* refactor: extract BatchHelpers into utils

Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client.

* feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client

Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable.

* refactor: extract FlowController's request cancellation into a method

* fix: Increment ITs fail with Bigtable as primary (#21)

We were setting timerange on Increment objects used in integration tests
without any reason and Bigtable doesn't support this operation. Setting
timerange in ITs was removed.

* fix: RequestScheduling should handle rejected resource reservations (#24)

Custom FlowControlerStrategy implementations might, contrary to the
default implementation, resolve reservation requests with exception,
what we should handle by not performing the action that had to acquire
the resources.

* feat: Add OpenCensus tracing and metrics. (#14)

* fix: make BatchHelpers skip verification of empty read results

BatchHelpers provides error handling of batch() when there may be some partial results.
Before the commit, matching successful reads were redundantly verified if there were none of them.
This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays.

* refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics

BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API.

* refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation

This change is split off from commit introducing MirroringAsyncTable#batch()

* feat: implement batch() in MirroringAsyncTable

Implementation of MirroringAsyncTable's batch() and
MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it.

* feat: implement failed mutations log (#19)

Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually.

Each failure is logged as a separate line, which makes it compatible with solutions like logstash.

* refactor: split SplitBatchResponse (#40)

SplitBatchResponse was refactored into two parts: splitting into
reads/writes and failed/successful. This makes the code simpler and
easier to maintain.

* refactor: extract helper methods in tests (#48)

* refactor: remove redundant writeWithControlFlow argument

* feat: copy HBase operations' input lists (#57)

* refactor: remove redundant field from MirroringConnection (#55)

* feat: verification with sampling (#28)

* fix: mirror Increment/Append in batch() using Put. (#47)

* refactor: Move HBaseOperation into WriteOperationInfo (#68)

* refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69)

* fix: integration tests - fix build (#70)

* fix: count references to batch operations (#63)

* fix: close underlying connections when MirroringConnection is closed (#49)

* refactor: fix IDE warnings in MirroringAsyncTable test (#64)

* fix: integration tests - check if write errors were reported (#71)

* feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65)

* fix: do not call callbacks with lock held (#53)

* refactor: use AccumulatedExceptions where appropriate (#54)

* fix: fix key used in verification sampling ITs (#77)

* feat: use faillog for handling write errors (#66)

* refactor: add utilities for Futures (FutureUtils)

* feat: defer closing connections until async operations complete (#37)

Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations.

* feat: implement AsyncTableBuilder (#42)

* feat: implement MirroringAsyncTable#checkAndMutate (#43)

* fix: Implement single Append and Increment as Put (#38)

* refactor: simplify SecondaryWriteErrorConsumer API (#78)

* feat: concurrent writes in MirroringTable (#79)

* test: fix failing concurrent write test (#120)

* refactor: renames and moves in RequestScheduling (#87)

* wip: handover session comments

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants