Skip to content

Conversation

@prawilny
Copy link
Collaborator

@prawilny prawilny commented Nov 3, 2021

This change is Reviewable

TestBlocking#testConnectionCloseBlocksUntilAllRequestsHaveBeenVerified doesn't pass - also on master.

Copy link

@mwalkiewicz mwalkiewicz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 4 files at r1, all commit messages.
Reviewable status: 3 of 4 files reviewed, 6 unresolved discussions (waiting on @mwalkiewicz and @prawilny)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 283 at r1 (raw file):

  private void terminateConnectionsWithTimeout(
      final Callable<Void> terminatingAction, boolean throwExceptionIfErred) throws IOException {

There is something called CallableThrowingIOException in our project, we can use it here to make exceptions thrown by terminatingAction more specific.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 295 at r1 (raw file):

      } catch (Exception e) {
        if (throwExceptionIfErred) {
          throw new IOException(e);

Can't we throw this exception unconditionally and just catch and ignore it in abort? We are already ignoring something there.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 301 at r1 (raw file):

      assert false;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);

Why won't we throw a IOException(e)?


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 302 at r1 (raw file):

    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } catch (TimeoutException e) {

I think that we should an error to a logger (to stdout) here.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 302 at r1 (raw file):

    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    } catch (TimeoutException e) {

So, if the timeout happens, the connections are not closed and the user is not notified. I think that we can throw a IOException(e) after scheduling the terminatingAction and let the user know that there is something went wrong (because the primary was not closed).


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java, line 136 at r1 (raw file):

   * pending operations before terminating connections with primary and secondary databases.
   *
   * <p>Defaults to 1000. If the wait times out, the termination is scheduled asynchronously.

I'd set the default to 60 seconds, we really want to wait for all the writes to finish.

@prawilny prawilny force-pushed the ac/MirroringConnection#close branch from cf1d870 to ec89765 Compare November 9, 2021 19:11
Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 1 of 4 files reviewed, 6 unresolved discussions (waiting on @mwalkiewicz)


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 283 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

There is something called CallableThrowingIOException in our project, we can use it here to make exceptions thrown by terminatingAction more specific.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 295 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Can't we throw this exception unconditionally and just catch and ignore it in abort? We are already ignoring something there.

Yes, we can. Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 301 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

Why won't we throw a IOException(e)?

Yep, it's better.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 302 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I think that we should an error to a logger (to stdout) here.

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java, line 302 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

So, if the timeout happens, the connections are not closed and the user is not notified. I think that we can throw a IOException(e) after scheduling the terminatingAction and let the user know that there is something went wrong (because the primary was not closed).

Done.


bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java, line 136 at r1 (raw file):

Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…

I'd set the default to 60 seconds, we really want to wait for all the writes to finish.

Done.

@prawilny prawilny force-pushed the ac/MirroringConnection#close branch from ec89765 to a429753 Compare November 9, 2021 19:17
@prawilny prawilny requested a review from mwalkiewicz November 9, 2021 19:18
Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about googleapis#3322 (comment) lead me to conclusion that abort() should have a simpler implementation and not wait for background requests.

Reviewable status: 0 of 4 files reviewed, 6 unresolved discussions (waiting on @mwalkiewicz)

terminatingAction.call();
} catch (ExecutionException e) {
throw new RuntimeException(e);
assert false;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rethrow it, we should not use asserts.

} catch (InterruptedException e) {
throw new IOException(e);
} catch (TimeoutException e) {
Log.trace("close() timeout");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.error?

@mwalkiewicz mwalkiewicz force-pushed the ac/MirroringConnection#close branch 3 times, most recently from 8a802ae to c1ddc47 Compare November 26, 2021 13:49
@mwalkiewicz mwalkiewicz changed the base branch from master to mw/review-fixes-tests/1 November 26, 2021 14:16
Copy link
Collaborator Author

@prawilny prawilny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: (with f2f nits)

Reviewed 2 of 4 files at r3, all commit messages.
Reviewable status: 2 of 4 files reviewed, 8 unresolved discussions (waiting on @mwalkiewicz and @prawilny)

@mwalkiewicz mwalkiewicz force-pushed the ac/MirroringConnection#close branch from c1ddc47 to df120ae Compare November 26, 2021 14:38
mwalkiewicz
mwalkiewicz previously approved these changes Nov 26, 2021
@prawilny prawilny changed the base branch from mw/review-fixes-tests/1 to master November 26, 2021 14:55
@prawilny prawilny dismissed mwalkiewicz’s stale review November 26, 2021 14:55

The base branch was changed.

@prawilny prawilny force-pushed the ac/MirroringConnection#close branch from df120ae to a90bcab Compare November 26, 2021 14:56
dopiera added a commit that referenced this pull request May 11, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
mwalkiewicz added a commit that referenced this pull request May 18, 2022
… review comments (googleapis#3347)

* chore: revert review comments

* feat: add MirroringOperationException exception markers (#125)

* feat: concurrent writes in MirroringBufferedMutator (#80)

* refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75)

* feat: implement MirroringAsyncTable#getName() (#132)

* feat: use Logger rather than stdout in DefaultMismatchDetector (#128)

* feat: synchronous writes (#88)

* fix: implement heapSize method for RowCell (#111)

* feat: FlowController accounts for memory usage (#137)

* refactor: remove Configuration as a base of MirroringConfiguration (#127)

* feat: MirroringAsyncBufferedMutator (#81)

* refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138)

* fix: BufferedMutator close() waits for all secondary flushes to finish (#110)

* feat: 2.x reads sampling (#114)

* refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134)

* ConcurrentBufferedMutator integration tests (#135)

* feat: add synchronous MirroringConnection to 2.x (#109)

* fix: MirroringConnection in 2.x failed to compile (#139)

* fix: fix BufferedMutator ITs (#140)

* feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108)

* feat: 2.x - rewrite Increment and Append as Put in batch (#116)

* fix: fix build (#142)

* refactor: minor fixes after review (#117)

* feat: MirroringAsyncTable#getScanner() (#58)

* test: 2.x integration tests (#112)

* feat: implement MirroringAsyncBufferedMutatorBuilder (#144)

* feat: log rows and values in DefaultMismatchDetector (#129)

* fix: ITs - add expected parameter to MismatchDetectors (#153)

* fix: force Append and Increment to return results and discard that result before returning it to user (#136)

* fix: review fixes in utils

* fix: review fixes in BufferedMutator

* fix: review fixes in Faillog

* fix: fixed reference counting

* fix: review fixes in FlowController

* fix: review fixes in metrics

* fix: review fixes in verification

* fix: Review fixes in MirroringTable

* fix: review fixes in HBase 2.x client

* fix: fixes in ITs

* feat: MirrorinAsyncTable: scan(), scanAll() (#131)

* fix: review fixes in tests

* feat: MirroringConnection: timeout in close() and abort() (#133)

* feat: better mismatch detection of scan results (#130)

* feat: quickstart (#105)

* fix: 2.x scan ITs (#158)

* fix: DefaultMismatchDetector tests (#157)

* fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161)

* fix: additional minor fixes after review (#163)

* fix: BufferedMutator review fixes (#164)

- Simplify #flush().
- Add javadocs.
- (sequential) Fix flush() exception handling.
- (sequential) Move error handling to a separate inner class.

* fix: PR fixes

* fix: report zeroed error metrics after successful operations

* fix: prepend MismatchDetectorCounter with Test to better reflect its purpose

* feat: Client-side timestamping (#165)

* fix: reduce timeout in TestBlocking to make the tests run faster

* fix: asyncClose -> closePrimaryAndScheduleSecondaryClose

* fix: remove unused Batcher#throwBatchDataExceptionIfPresent

* fix: remove unused Comparators#compareRows

* fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion

* feat: remove unused MirroringTracer from FailedMutationLogger

* fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer

* fix: TestMirroringAsyncTableInputModification typo fix

* fix: describe user flush() in Buffered Mutator in quickstart

* fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer

* refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception>

* BufferedMutator - add close timeout

* AsyncBufferedMutator - add close timeout

* fix: remove stale addSecondaryMutation comment

* fix: add a comment that addSecondaryMutation handles failed writes

* fix: unify implementations of flushBufferedMutatorBeforeClosing

* fix: BufferedMutator - throw exceptions on close

* fix: BufferedMutator - add comment explaining that chain of flush operations is created

* fix: BufferedMutator - clarify  comments

* fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable

* fix: explain why flush is called in Sequential BufferedMutator test

* fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit

* refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush()

* refactor: make FlushSerializer non-static

* fix: BufferedMutator - use HierarchicalReferenceCounter

* feat: Add MirroringConnection constructor taking MirroringConfiguration

* refactor: move releaseReservations to finally

* fix: use less convoluted example in lastFlushFutures description

* fix: merge small Timeestamper files into a single file

* fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator

* fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered

* fix: add comment explaining why batch is complicated

* fix: add a TODO to implement point writes without batch

Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com>
Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1.x: we should add a timeout to prevent deadlock in case of a bug on our side.

3 participants