-
Notifications
You must be signed in to change notification settings - Fork 0
feat: better mismatch detection of scan results #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 16 files at r1, all commit messages.
Reviewable status: 2 of 16 files reviewed, 5 unresolved discussions (waiting on @mwalkiewicz and @prawilny)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 237 at r1 (raw file):
final int permitsNeeded = orderEnforcer.getNextPermits(); FutureCallback<T> orderEnforcedScannerNext =
This trick is neat, but I think that it is not very obvious and can be made more straightforward and more readable with a plain queue and a mutex.
I think that it'd be enough to create a queue of entries to be verified (AsyncScannerVerificationPayload and AsyncScannerExceptionWithContext produced by AsyncResultScannerWrapper#performNext() - that method should then return nulls, i think) and in this callback we take a mutex, pop an element from the queue, and perform verification on that element. That would give us ordered pairs of primary and secondary results, and a synchronization, and would be explicit about the purpose.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 277 at r1 (raw file):
} public class LastUnmatched {
I do not yet know how the algorithm in this class works, but I see something now:
- I do not like
primaryPreviouslyUnmatchedname, I do not think that the same says what it does; - Comparison logic is now split between two files, can we make
LastUnmatchedjust a struct with two deques, and move all the logic into static methods in MismatchDetector?
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 383 at r1 (raw file):
} public <T> void waitForTurn(int permits) {
why <T>?
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/verification/DefaultMismatchDetector.java, line 168 at r1 (raw file):
Scan scan, HBaseOperation operation) { int primaryIdx = 0, secondaryIdx = 0;
Split declarations into separate lines. https://google.github.io/styleguide/javaguide.html#s4.8.2-variable-declarations
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1434 at r1 (raw file):
@Test public void testUnmatchedScannerResultQueuesAreFlushedWhenResultScannerIsClosed()
I'd also test if everything works as expected when:
MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS == 0;- primary and secondary return the same values at first, then primary returns a single value not in the secondary, then values match, and later primary returns exclusive result once again.
- number of buffered records exceeds the threshold
- number of buffered records exceeds the threshold, but later the somehow start to return the same values again (don't know if it describes any sane real-life scenario, but I'd like to know what will happen, as a sanity check)
- scanner first encounters a result missing from primary, and after several reads a result missing from secondary.
5d5f1b6 to
901a0a5
Compare
ce72708 to
8d16430
Compare
286fe15 to
3403c8f
Compare
8d16430 to
d23b5ff
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @mwalkiewicz from 3 discussions.
Reviewable status: 0 of 22 files reviewed, 2 unresolved discussions (waiting on @mwalkiewicz)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 237 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
This trick is neat, but I think that it is not very obvious and can be made more straightforward and more readable with a plain queue and a mutex.
I think that it'd be enough to create a queue of entries to be verified (
AsyncScannerVerificationPayloadandAsyncScannerExceptionWithContextproduced byAsyncResultScannerWrapper#performNext()- that method should then return nulls, i think) and in this callback we take a mutex, pop an element from the queue, and perform verification on that element. That would give us ordered pairs of primary and secondary results, and a synchronization, and would be explicit about the purpose.
Done.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 277 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I do not yet know how the algorithm in this class works, but I see something now:
- I do not like
primaryPreviouslyUnmatchedname, I do not think that the same says what it does;- Comparison logic is now split between two files, can we make
LastUnmatchedjust a struct with two deques, and move all the logic into static methods in MismatchDetector?
Done (but in a bit different way).
The code is much simpler now.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java, line 383 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
why
<T>?
Leftover from some earlier version which accepted some future as an argument.
I removed OrderEnforcer, so it no longer matters.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/verification/DefaultMismatchDetector.java, line 168 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Split declarations into separate lines. https://google.github.io/styleguide/javaguide.html#s4.8.2-variable-declarations
Removed this function.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java, line 1434 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I'd also test if everything works as expected when:
MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS == 0;- primary and secondary return the same values at first, then primary returns a single value not in the secondary, then values match, and later primary returns exclusive result once again.
- number of buffered records exceeds the threshold
- number of buffered records exceeds the threshold, but later the somehow start to return the same values again (don't know if it describes any sane real-life scenario, but I'd like to know what will happen, as a sanity check)
- scanner first encounters a result missing from primary, and after several reads a result missing from secondary.
Done.
| } | ||
|
|
||
| @Test | ||
| public void testUnmatchedScannerResultQueuesAreFlushedWhenResultScannerIsClosed() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move all tests related to ScannerResultVerifier to a separate file.
| .recordReadMismatches(any(HBaseOperation.class), eq(1)); | ||
| waitForMirroringScanner(mirroringScanner); | ||
|
|
||
| verify(verifier, times(1)).verify(any(Result[].class), any(Result[].class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to verify that mistmatch of expected1 and expected2 was reported?
| assertThat(mirroringScanner.next()).isEqualTo(expected7); | ||
| waitForMirroringScanner(mirroringScanner); | ||
|
|
||
| verify(verifier, times(7)).verify(any(Result[].class), any(Result[].class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, I'd like to know which mismatches exactly were reported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This applies to all other tests as well.
|
|
||
| private void addNotNull(Deque<Result> mismatchBuffer, Result[] results) { | ||
| for (Result result : results) { | ||
| if (result == null) break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use ifs without brackets. it is error prone.
| } | ||
|
|
||
| private Set<Result> matchingResults() { | ||
| Set<Result> results = new HashSet<>(this.primaryMismatchBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this Set can be calculated on the fly instead of rebuilding it every time.
7def679 to
5806722
Compare
af32443 to
df464fd
Compare
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 5 of 22 files at r3, 20 of 23 files at r5, 3 of 3 files at r6, all commit messages.
Reviewable status: all files reviewed, 7 unresolved discussions (waiting on @mwalkiewicz and @prawilny)
… review comments (googleapis#3347) * chore: revert review comments * feat: add MirroringOperationException exception markers (#125) * feat: concurrent writes in MirroringBufferedMutator (#80) * refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75) * feat: implement MirroringAsyncTable#getName() (#132) * feat: use Logger rather than stdout in DefaultMismatchDetector (#128) * feat: synchronous writes (#88) * fix: implement heapSize method for RowCell (#111) * feat: FlowController accounts for memory usage (#137) * refactor: remove Configuration as a base of MirroringConfiguration (#127) * feat: MirroringAsyncBufferedMutator (#81) * refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138) * fix: BufferedMutator close() waits for all secondary flushes to finish (#110) * feat: 2.x reads sampling (#114) * refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134) * ConcurrentBufferedMutator integration tests (#135) * feat: add synchronous MirroringConnection to 2.x (#109) * fix: MirroringConnection in 2.x failed to compile (#139) * fix: fix BufferedMutator ITs (#140) * feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108) * feat: 2.x - rewrite Increment and Append as Put in batch (#116) * fix: fix build (#142) * refactor: minor fixes after review (#117) * feat: MirroringAsyncTable#getScanner() (#58) * test: 2.x integration tests (#112) * feat: implement MirroringAsyncBufferedMutatorBuilder (#144) * feat: log rows and values in DefaultMismatchDetector (#129) * fix: ITs - add expected parameter to MismatchDetectors (#153) * fix: force Append and Increment to return results and discard that result before returning it to user (#136) * fix: review fixes in utils * fix: review fixes in BufferedMutator * fix: review fixes in Faillog * fix: fixed reference counting * fix: review fixes in FlowController * fix: review fixes in metrics * fix: review fixes in verification * fix: Review fixes in MirroringTable * fix: review fixes in HBase 2.x client * fix: fixes in ITs * feat: MirrorinAsyncTable: scan(), scanAll() (#131) * fix: review fixes in tests * feat: MirroringConnection: timeout in close() and abort() (#133) * feat: better mismatch detection of scan results (#130) * feat: quickstart (#105) * fix: 2.x scan ITs (#158) * fix: DefaultMismatchDetector tests (#157) * fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161) * fix: additional minor fixes after review (#163) * fix: BufferedMutator review fixes (#164) - Simplify #flush(). - Add javadocs. - (sequential) Fix flush() exception handling. - (sequential) Move error handling to a separate inner class. * fix: PR fixes * fix: report zeroed error metrics after successful operations * fix: prepend MismatchDetectorCounter with Test to better reflect its purpose * feat: Client-side timestamping (#165) * fix: reduce timeout in TestBlocking to make the tests run faster * fix: asyncClose -> closePrimaryAndScheduleSecondaryClose * fix: remove unused Batcher#throwBatchDataExceptionIfPresent * fix: remove unused Comparators#compareRows * fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion * feat: remove unused MirroringTracer from FailedMutationLogger * fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer * fix: TestMirroringAsyncTableInputModification typo fix * fix: describe user flush() in Buffered Mutator in quickstart * fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer * refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception> * BufferedMutator - add close timeout * AsyncBufferedMutator - add close timeout * fix: remove stale addSecondaryMutation comment * fix: add a comment that addSecondaryMutation handles failed writes * fix: unify implementations of flushBufferedMutatorBeforeClosing * fix: BufferedMutator - throw exceptions on close * fix: BufferedMutator - add comment explaining that chain of flush operations is created * fix: BufferedMutator - clarify comments * fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable * fix: explain why flush is called in Sequential BufferedMutator test * fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit * refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush() * refactor: make FlushSerializer non-static * fix: BufferedMutator - use HierarchicalReferenceCounter * feat: Add MirroringConnection constructor taking MirroringConfiguration * refactor: move releaseReservations to finally * fix: use less convoluted example in lastFlushFutures description * fix: merge small Timeestamper files into a single file * fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator * fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered * fix: add comment explaining why batch is complicated * fix: add a TODO to implement point writes without batch Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com> Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
… review comments (googleapis#3347) * chore: revert review comments * feat: add MirroringOperationException exception markers (#125) * feat: concurrent writes in MirroringBufferedMutator (#80) * refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75) * feat: implement MirroringAsyncTable#getName() (#132) * feat: use Logger rather than stdout in DefaultMismatchDetector (#128) * feat: synchronous writes (#88) * fix: implement heapSize method for RowCell (#111) * feat: FlowController accounts for memory usage (#137) * refactor: remove Configuration as a base of MirroringConfiguration (#127) * feat: MirroringAsyncBufferedMutator (#81) * refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138) * fix: BufferedMutator close() waits for all secondary flushes to finish (#110) * feat: 2.x reads sampling (#114) * refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134) * ConcurrentBufferedMutator integration tests (#135) * feat: add synchronous MirroringConnection to 2.x (#109) * fix: MirroringConnection in 2.x failed to compile (#139) * fix: fix BufferedMutator ITs (#140) * feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108) * feat: 2.x - rewrite Increment and Append as Put in batch (#116) * fix: fix build (#142) * refactor: minor fixes after review (#117) * feat: MirroringAsyncTable#getScanner() (#58) * test: 2.x integration tests (#112) * feat: implement MirroringAsyncBufferedMutatorBuilder (#144) * feat: log rows and values in DefaultMismatchDetector (#129) * fix: ITs - add expected parameter to MismatchDetectors (#153) * fix: force Append and Increment to return results and discard that result before returning it to user (#136) * fix: review fixes in utils * fix: review fixes in BufferedMutator * fix: review fixes in Faillog * fix: fixed reference counting * fix: review fixes in FlowController * fix: review fixes in metrics * fix: review fixes in verification * fix: Review fixes in MirroringTable * fix: review fixes in HBase 2.x client * fix: fixes in ITs * feat: MirrorinAsyncTable: scan(), scanAll() (#131) * fix: review fixes in tests * feat: MirroringConnection: timeout in close() and abort() (#133) * feat: better mismatch detection of scan results (#130) * feat: quickstart (#105) * fix: 2.x scan ITs (#158) * fix: DefaultMismatchDetector tests (#157) * fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161) * fix: additional minor fixes after review (#163) * fix: BufferedMutator review fixes (#164) - Simplify #flush(). - Add javadocs. - (sequential) Fix flush() exception handling. - (sequential) Move error handling to a separate inner class. * fix: PR fixes * fix: report zeroed error metrics after successful operations * fix: prepend MismatchDetectorCounter with Test to better reflect its purpose * feat: Client-side timestamping (#165) * fix: reduce timeout in TestBlocking to make the tests run faster * fix: asyncClose -> closePrimaryAndScheduleSecondaryClose * fix: remove unused Batcher#throwBatchDataExceptionIfPresent * fix: remove unused Comparators#compareRows * fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion * feat: remove unused MirroringTracer from FailedMutationLogger * fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer * fix: TestMirroringAsyncTableInputModification typo fix * fix: describe user flush() in Buffered Mutator in quickstart * fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer * refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception> * BufferedMutator - add close timeout * AsyncBufferedMutator - add close timeout * fix: remove stale addSecondaryMutation comment * fix: add a comment that addSecondaryMutation handles failed writes * fix: unify implementations of flushBufferedMutatorBeforeClosing * fix: BufferedMutator - throw exceptions on close * fix: BufferedMutator - add comment explaining that chain of flush operations is created * fix: BufferedMutator - clarify comments * fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable * fix: explain why flush is called in Sequential BufferedMutator test * fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit * refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush() * refactor: make FlushSerializer non-static * fix: BufferedMutator - use HierarchicalReferenceCounter * feat: Add MirroringConnection constructor taking MirroringConfiguration * refactor: move releaseReservations to finally * fix: use less convoluted example in lastFlushFutures description * fix: merge small Timeestamper files into a single file * fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator * fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered * fix: add comment explaining why batch is complicated * fix: add a TODO to implement point writes without batch Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com> Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
I created a single PR as two last points in #121 would require a lot of rebasing should they need to be rewritten.
Warning: there's quite a lot of copypasted code with only ~5 lines of changes in tests in TestMirroringTable (the tests are there instead of in TestMirroringResultScanner because this way there needs to be significantly less boilerplate).
This change is