Skip to content

Conversation

@prawilny
Copy link
Collaborator

@prawilny prawilny commented Nov 1, 2021

I created a single PR as two last points in #121 would require a lot of rebasing should they need to be rewritten.
Warning: there's quite a lot of copypasted code with only ~5 lines of changes in tests in TestMirroringTable (the tests are there instead of in TestMirroringResultScanner because this way there needs to be significantly less boilerplate).


This change is Reviewable

@prawilny prawilny requested a review from mwalkiewicz November 1, 2021 13:42
@prawilny prawilny linked an issue Nov 1, 2021 that may be closed by this pull request
Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 16 files at r1, all commit messages.
Reviewable status: 2 of 16 files reviewed, 5 unresolved discussions (waiting on @mwalkiewicz and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 237 at r1 (raw file):

    final int permitsNeeded = orderEnforcer.getNextPermits();
    FutureCallback<T> orderEnforcedScannerNext =

This trick is neat, but I think that it is not very obvious and can be made more straightforward and more readable with a plain queue and a mutex.

I think that it'd be enough to create a queue of entries to be verified (AsyncScannerVerificationPayload and AsyncScannerExceptionWithContext produced by AsyncResultScannerWrapper#performNext() - that method should then return nulls, i think) and in this callback we take a mutex, pop an element from the queue, and perform verification on that element. That would give us ordered pairs of primary and secondary results, and a synchronization, and would be explicit about the purpose.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 277 at r1 (raw file):

  }

  public class LastUnmatched {

I do not yet know how the algorithm in this class works, but I see something now:

  • I do not likeprimaryPreviouslyUnmatched name, I do not think that the same says what it does;
  • Comparison logic is now split between two files, can we make LastUnmatched just a struct with two deques, and move all the logic into static methods in MismatchDetector?

bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 383 at r1 (raw file):

    }

    public <T> void waitForTurn(int permits) {

why <T>?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/verification/DefaultMismatchDetector.java, line 168 at r1 (raw file):

      Scan scan,
      HBaseOperation operation) {
    int primaryIdx = 0, secondaryIdx = 0;

Split declarations into separate lines. https://google.github.io/styleguide/javaguide.html#s4.8.2-variable-declarations


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1434 at r1 (raw file):

  @Test
  public void testUnmatchedScannerResultQueuesAreFlushedWhenResultScannerIsClosed()

I'd also test if everything works as expected when:

  • MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS == 0;
  • primary and secondary return the same values at first, then primary returns a single value not in the secondary, then values match, and later primary returns exclusive result once again.
  • number of buffered records exceeds the threshold
  • number of buffered records exceeds the threshold, but later the somehow start to return the same values again (don't know if it describes any sane real-life scenario, but I'd like to know what will happen, as a sanity check)
  • scanner first encounters a result missing from primary, and after several reads a result missing from secondary.

@prawilny prawilny force-pushed the ac/MismatchDetector/2 branch 2 times, most recently from 5d5f1b6 to 901a0a5 Compare November 5, 2021 10:01
@prawilny prawilny force-pushed the ac/MismatchDetector/3 branch 2 times, most recently from ce72708 to 8d16430 Compare November 9, 2021 17:41
@prawilny prawilny force-pushed the ac/MismatchDetector/2 branch 5 times, most recently from 286fe15 to 3403c8f Compare November 9, 2021 18:53
@prawilny prawilny force-pushed the ac/MismatchDetector/3 branch from 8d16430 to d23b5ff Compare November 10, 2021 17:35
Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dismissed @mwalkiewicz from 3 discussions.
Reviewable status: 0 of 22 files reviewed, 2 unresolved discussions (waiting on @mwalkiewicz)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 237 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

This trick is neat, but I think that it is not very obvious and can be made more straightforward and more readable with a plain queue and a mutex.

I think that it'd be enough to create a queue of entries to be verified (AsyncScannerVerificationPayload and AsyncScannerExceptionWithContext produced by AsyncResultScannerWrapper#performNext() - that method should then return nulls, i think) and in this callback we take a mutex, pop an element from the queue, and perform verification on that element. That would give us ordered pairs of primary and secondary results, and a synchronization, and would be explicit about the purpose.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 277 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I do not yet know how the algorithm in this class works, but I see something now:

  • I do not likeprimaryPreviouslyUnmatched name, I do not think that the same says what it does;
  • Comparison logic is now split between two files, can we make LastUnmatched just a struct with two deques, and move all the logic into static methods in MismatchDetector?

Done (but in a bit different way).
The code is much simpler now.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 383 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

why <T>?

Leftover from some earlier version which accepted some future as an argument.
I removed OrderEnforcer, so it no longer matters.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/verification/DefaultMismatchDetector.java, line 168 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Split declarations into separate lines. https://google.github.io/styleguide/javaguide.html#s4.8.2-variable-declarations

Removed this function.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1434 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I'd also test if everything works as expected when:

  • MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS == 0;
  • primary and secondary return the same values at first, then primary returns a single value not in the secondary, then values match, and later primary returns exclusive result once again.
  • number of buffered records exceeds the threshold
  • number of buffered records exceeds the threshold, but later the somehow start to return the same values again (don't know if it describes any sane real-life scenario, but I'd like to know what will happen, as a sanity check)
  • scanner first encounters a result missing from primary, and after several reads a result missing from secondary.

Done.

@prawilny prawilny requested a review from mwalkiewicz November 10, 2021 17:39
}

@Test
public void testUnmatchedScannerResultQueuesAreFlushedWhenResultScannerIsClosed()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move all tests related to ScannerResultVerifier to a separate file.

.recordReadMismatches(any(HBaseOperation.class), eq(1));
waitForMirroringScanner(mirroringScanner);

verify(verifier, times(1)).verify(any(Result[].class), any(Result[].class));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to verify that mistmatch of expected1 and expected2 was reported?

assertThat(mirroringScanner.next()).isEqualTo(expected7);
waitForMirroringScanner(mirroringScanner);

verify(verifier, times(7)).verify(any(Result[].class), any(Result[].class));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I'd like to know which mismatches exactly were reported.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to all other tests as well.


private void addNotNull(Deque<Result> mismatchBuffer, Result[] results) {
for (Result result : results) {
if (result == null) break;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not use ifs without brackets. it is error prone.

}

private Set<Result> matchingResults() {
Set<Result> results = new HashSet<>(this.primaryMismatchBuffer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this Set can be calculated on the fly instead of rebuilding it every time.

@prawilny prawilny force-pushed the ac/MismatchDetector/3 branch from 7def679 to 5806722 Compare November 26, 2021 16:41
@prawilny prawilny changed the base branch from ac/MismatchDetector/2 to master November 26, 2021 16:41
@mwalkiewicz mwalkiewicz force-pushed the ac/MismatchDetector/3 branch from af32443 to df464fd Compare November 29, 2021 11:09
Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 22 files at r3, 20 of 23 files at r5, 3 of 3 files at r6, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @mwalkiewicz and @prawilny)

@mwalkiewicz mwalkiewicz merged commit 8a87e4c into master Nov 29, 2021
dopiera added a commit that referenced this pull request May 11, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
mwalkiewicz added a commit that referenced this pull request May 18, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Mismatch detector fixes

3 participants