-
Notifications
You must be signed in to change notification settings - Fork 0
feat: MirroringAsyncBufferedMutator #81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm publishing the review as comment so that you can merge this if I'm out of office.
Also please discuss the exact semantics of the BufferedMutator with Mateusz.
Reviewed all commit messages.
Reviewable status: 0 of 2 files reviewed, 14 unresolved discussions (waiting on @kboroszko and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 60 at r1 (raw file):
@Override public CompletableFuture<Void> mutate(Mutation mutation) { referenceCounter.incrementReferenceCount();
You can avoid decrementing reference counter repetitively: for example by using a function mutationWrapper which would do what mutate() does right now and external function that decrements it after this wrapper completes with either result.
If you rewrite it this way, you'll probably have problems with types of exceptions thrown. I'd suggest then wrapping Throwable in CompletionException and unwrapping it before returning to the user.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 65 at r1 (raw file):
primaryCompleted .thenRunAsync(
We should never use ...Async without ExecutorService.
We also generally don't use ...Async with ExecutorService - just use thenRun().
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 68 at r1 (raw file):
() -> { // when primary completes, request resources. CompletableFuture<FlowController.ResourceReservation> resourceAcquired =
It's only a future so "Acquired" suffix is inappropriate.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
resourceAcquired .thenRunAsync(
If I were you, I'd think about using some fancier methods of CompletableFuture: I'd maybe go with something like
resultFuture = primaryFuture.exceptionally(SOMETHING).thenCompose(ignored -> flowController.async()).exceptionally(SOMETHING2);
return resultFuture.whenComplete(() -> scheduleSecondary());
Of course, there would come up problems with error handling.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 83 at r1 (raw file):
.exceptionally( ex -> { // TODO log secondary error
SecondaryWriteErrorConsumer
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 87 at r1 (raw file):
"secondary failed " + mutation + ", got error:" + ex); referenceCounter.decrementReferenceCount(); return null;
tip: if you throw an exception in some kind of then/when/exceptionally it's automatically converted into a CompletableFuture that is completed exceptionally with this exception.
Note that it has to be an unchecked exception (or else the compiler complains) - usually Throwable is wrapped in CompletionException
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 102 at r1 (raw file):
.exceptionally( ex -> { // primary failed, propagate error
Comment is redundant. Just name the variable e.g. primaryError
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 114 at r1 (raw file):
@Override public List<CompletableFuture<Void>> mutate(List<? extends Mutation> list) {
We want to be faster. We can do this by minimizing serialization/deserialization by sending big requests to primary and secondary.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 129 at r1 (raw file):
@Override public void close() {
"The implementation is required to be thread safe." -- https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/AsyncBufferedMutator.html
It is not thread-safe.
Right now the best close() implementation is probably the one in MirroringConnection - copy it.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 143 at r1 (raw file):
@Override public long getWriteBufferSize() { return primary.getWriteBufferSize();
It should be min(primary.getBufferSize(), secondary.getBufferSize()), I think.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 147 at r1 (raw file):
@Override public long getPeriodicalFlushTimeout(TimeUnit unit) {
I'd rather implement this only after periodic flash itself is implemented.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 98 at r1 (raw file):
}); CompletableFuture<FlowController.ResourceReservation> resourcesAllocated =
There's a method somewhere that sets FlowController mocks for you with a single line of code. It's used in Test{,Async}MirroringTable
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 115 at r1 (raw file):
// got resources so we got the result resourcesAllocated.complete(() -> {});
null is a simpler catch-all
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 131 at r1 (raw file):
CompletableFuture<Void> primaryFailure = new CompletableFuture<>(); CompletableFuture<Void> secondaryCalled = new CompletableFuture<>();
You could move all of reused setup to setUp() [which is called before every test case].
If there's a problem with "unused X", just see how lenient()is used.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 171 at r1 (raw file):
assertThat(secondaryCalled.isDone()).isFalse(); resourcesAllocated.completeExceptionally(new RuntimeException());
IOException is the most common exception in this API, I think (and thus it's most commonly used in my and Mateusz's tests)
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r1, all commit messages.
Reviewable status: all files reviewed, 19 unresolved discussions (waiting on @kboroszko and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 33 at r1 (raw file):
import java.util.concurrent.TimeUnit; public class MirroringAsyncBufferedMutator implements AsyncBufferedMutator {
We should mark internal classes with @InternalApi annotation.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 40 at r1 (raw file):
private final ListenableReferenceCounter referenceCounter; public MirroringAsyncBufferedMutator(
Generally looks OK.
I just want to describe a alternative implementation that might have some performance improvements.
Instead of requesting resources from the FlowController for each Mutation separately, we might wait for all mutations scheduled in a single mutate(List<Mutation>) call and acquire them all at once.
The flow would look like this:
Future<Void> all = waitForAll(primary(list));
all.thenRun(() -> {
select successful mutations;
request flow control resources
}) .then(() -> {
schedule secondary;
})
- exception handling.
I think that it would be less readable than the current implementation, but we'd have slightly less overhead.
I do not think that is it worth to make this change, but I'm describing it for completeness.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 60 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
You can avoid decrementing reference counter repetitively: for example by using a function mutationWrapper which would do what mutate() does right now and external function that decrements it after this wrapper completes with either result.
If you rewrite it this way, you'll probably have problems with types of exceptions thrown. I'd suggest then wrapping Throwable in CompletionException and unwrapping it before returning to the user.
I don't think this is true at the moment. Currently the future completes after the flow controller resources are acquired, decrementing the counter then wouldn't be appropriate.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 65 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
We should never use ...Async without ExecutorService.
We also generally don't use ...Async with ExecutorService - just use thenRun().
The difference is that ...Async without executor service starts the task on default executor service (java provides some default threadpool we wouldn't like to use), ...Async with executor starts the task on supplied executor, and methods without Async suffix on the same thread that calls .complete on the future (+ some egde cases).
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 68 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
It's only a future so "Acquired" suffix is inappropriate.
resourceReservationRequest?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 82 at r1 (raw file):
.thenRun(referenceCounter::decrementReferenceCount) .exceptionally( ex -> {
ex -> secondaryError?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 83 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
SecondaryWriteErrorConsumer
Yes, you should take a SecondaryWriteErrorConsumer instance as an ctor parameter and use its consume method here.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 91 at r1 (raw file):
}) .exceptionally( ex -> {
resourceAcquisitionError?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 104 at r1 (raw file):
// primary failed, propagate error referenceCounter.decrementReferenceCount(); // TODO log primary error
Primary errors should be forwarded to the user, we are not logging them anywhere.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 114 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
We want to be faster. We can do this by minimizing serialization/deserialization by sending big requests to primary and secondary.
I think that buffering will take care of sending bigger requests :)
The intuition is correct, we'd rather make a single big request than several small ones, but I still have to think if we will have any significant performance improvement if we changed current implementation to a one that handles several writes at once. It will reduce flowController overhead, that is for sure, but it'd be the only thing and it might reduce readability.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 143 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
It should be
min(primary.getBufferSize(), secondary.getBufferSize()), I think.
That is not true. If secondary buffer size < primary buffer size, then the user would expect a flush after inserting secondary buffer size (return value of this method) bytes of mutations into the BufferedMutator, but the flush wouldn't happen because the primary buffer wouldn't be full yet. Current implementation is correct imo.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 147 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
I'd rather implement this only after periodic flash itself is implemented.
I think that this implementation is correct, primary periodic flush timeout is more-or-less our periodic flush timeout, we do not have to implement it manually.
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 2 files reviewed, 19 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 33 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
We should mark internal classes with
@InternalApiannotation.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 65 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
The difference is that ...Async without executor service starts the task on default executor service (java provides some default threadpool we wouldn't like to use), ...Async with executor starts the task on supplied executor, and methods without Async suffix on the same thread that calls
.completeon the future (+ some egde cases).
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 68 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
resourceReservationRequest?
I feel like the type of a variable can affect how you should interpret it just as much as it's name. So if you have a future that is called resourceAcquired, it means that when it completes, you have acquired it.
This way you can later write resourceAcquired.thenRun().
But I don't mind changing it. Besides, I am outnumbered...
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
If I were you, I'd think about using some fancier methods of CompletableFuture: I'd maybe go with something like
resultFuture = primaryFuture.exceptionally(SOMETHING).thenCompose(ignored -> flowController.async()).exceptionally(SOMETHING2); return resultFuture.whenComplete(() -> scheduleSecondary());Of course, there would come up problems with error handling.
I see your point, however I think that those 'problems with error handling' that you mentioned yourself, make it so complicated that it defeats the purpose.
We need to decrement the reference counter on primaryFuture error, but also we want the error to be propagated to the user. The way you have suggested, if the primary future failed, the user wouldn't catch that because it would be handled by first exceptionally(SOMETHING).
We can go in that direction and write:
resultFuture = primaryFuture.thenCompose(ignored -> flowController.async());
primaryFuture.exceptionally(SOMETHING);
resultFuture.exceptionally(SOMETHING2);
...
return resultFuture;But I fail to see how that's better than what I wrote... ¯_(ツ)_/¯
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 82 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
ex->secondaryError?
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 83 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Yes, you should take a
SecondaryWriteErrorConsumerinstance as an ctor parameter and use itsconsumemethod here.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 91 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
resourceAcquisitionError?
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 102 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
Comment is redundant. Just name the variable e.g.
primaryError
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 104 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Primary errors should be forwarded to the user, we are not logging them anywhere.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 143 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
That is not true. If
secondary buffer size<primary buffer size, then the user would expect a flush after insertingsecondary buffer size(return value of this method) bytes of mutations into the BufferedMutator, but the flush wouldn't happen because the primary buffer wouldn't be full yet. Current implementation is correct imo.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 147 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I think that this implementation is correct, primary periodic flush timeout is more-or-less our periodic flush timeout, we do not have to implement it manually.
Done.
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 2 files reviewed, 19 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 60 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I don't think this is true at the moment. Currently the future completes after the flow controller resources are acquired, decrementing the counter then wouldn't be appropriate.
Agreed. Wrapping it wouldn't work, and I feel that restructuring it so there is a part that can be wrapped this way is much more obscure than calling the counter a few times.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 129 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
"The implementation is required to be thread safe." -- https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/AsyncBufferedMutator.html
It is not thread-safe.Right now the best close() implementation is probably the one in MirroringConnection - copy it.
Done.
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r2.
Reviewable status: 1 of 3 files reviewed, 15 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, kboroszko (Kajetan Boroszko) wrote…
I see your point, however I think that those 'problems with error handling' that you mentioned yourself, make it so complicated that it defeats the purpose.
We need to decrement the reference counter on primaryFuture error, but also we want the error to be propagated to the user. The way you have suggested, if the primary future failed, the user wouldn't catch that because it would be handled by firstexceptionally(SOMETHING).We can go in that direction and write:
resultFuture = primaryFuture.thenCompose(ignored -> flowController.async()); primaryFuture.exceptionally(SOMETHING); resultFuture.exceptionally(SOMETHING2); ... return resultFuture;But I fail to see how that's better than what I wrote... ¯_(ツ)_/¯
I do not argue for changing this code, but following the x.then().exceptionally() can lead to code that is less indented and that has error handling closer to code that can throw an error (now primary operation is handled at the top of the method, and its exceptions are handled at the very bottom). Those things make the code more readable.
I didn't try that, but isn't such a pattern possible?
CompletableFuture<Void> result = new CompletableFuture<>();
incrementRefCounter();
primaryFuture.exceptionaly((e) -> { result.exceptionally(e); decrementRef(); })
CompletableFuture<?> resourceAcquired = primaryFuture.thenCompose((ignored) -> flowController.async());
resourcesAcquired.exceptionally((e) -> { result.complete(null); decrementRef(); });
CompletableFuture<?> secondaryResult = resourcesAcquired.thenCompose((ignored) -> runSecondaryOp());
secondaryResult.exceptionally((e) -> { ... });
secondaryResult.thenRun(() -> decrementRef());
(But I'm a n00b in using java futures and I might be wrong and I do not have a code to test on :D )
What do you think?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 32 at r2 (raw file):
import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.apache.hadoop.hbase.client.*;
* import :(
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 15 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 98 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
There's a method somewhere that sets FlowController mocks for you with a single line of code. It's used in Test{,Async}MirroringTable
Sure but it doesn't return the future and I'd like to control when it gets completed.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 115 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
nullis a simpler catch-all
Thanks!
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 32 at r2 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
*import :(
Done.
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 15 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I do not argue for changing this code, but following the x.then().exceptionally() can lead to code that is less indented and that has error handling closer to code that can throw an error (now primary operation is handled at the top of the method, and its exceptions are handled at the very bottom). Those things make the code more readable.
I didn't try that, but isn't such a pattern possible?
CompletableFuture<Void> result = new CompletableFuture<>(); incrementRefCounter(); primaryFuture.exceptionaly((e) -> { result.exceptionally(e); decrementRef(); }) CompletableFuture<?> resourceAcquired = primaryFuture.thenCompose((ignored) -> flowController.async()); resourcesAcquired.exceptionally((e) -> { result.complete(null); decrementRef(); }); CompletableFuture<?> secondaryResult = resourcesAcquired.thenCompose((ignored) -> runSecondaryOp()); secondaryResult.exceptionally((e) -> { ... }); secondaryResult.thenRun(() -> decrementRef());(But I'm a n00b in using java futures and I might be wrong and I do not have a code to test on :D )
What do you think?
Cut the modesty, you're not a n00b and you know it! 🙉 I love it! There's one problem with this implementation though. You hook two exceptionally to the same primaryFuture. If the primary future fails, it triggers the first one, but it is also propagated further through thenCompose and to the second one. Worst part is, I don't really see a way around that other than nesting the futures like it is now...
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 16 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/mirroringmetrics/MirroringSpanConstants.java, line 96 at r3 (raw file):
MIRRORING_CONNECTION_ABORT("MirroringConnection.abort"), BUFFERED_MUTATOR_CLOSE("BufferedMutator.close"), MIRRORING_BUFFERED_MUTATOR_CLOSE("MirroringBufferedMutator.close");
Is it used anywhere?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, kboroszko (Kajetan Boroszko) wrote…
Cut the modesty, you're not a n00b and you know it! 🙉 I love it! There's one problem with this implementation though. You hook two
exceptionallyto the same primaryFuture. If the primary future fails, it triggers the first one, but it is also propagated further through thenCompose and to the second one. Worst part is, I don't really see a way around that other than nesting the futures like it is now...
And this is exactly why I said I'm a n00b, I knew that the code looks to pretty to be allowed by java :D I cannot see any better solution.
I'd approve now, but the code seems too simple for me and I want to check it again tomorrow if there isn't something that is missing :) Also, I'd be great to do some integration tests for this code, but the ITs project for 2.x is not ready yet. Adam was working on it and maybe he could share it with you tomorrow.
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 16 unresolved discussions (waiting on @kboroszko, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
And this is exactly why I said I'm a n00b, I knew that the code looks to pretty to be allowed by java :D I cannot see any better solution.
I'd approve now, but the code seems too simple for me and I want to check it again tomorrow if there isn't something that is missing :) Also, I'd be great to do some integration tests for this code, but the ITs project for 2.x is not ready yet. Adam was working on it and maybe he could share it with you tomorrow.
It'd*
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 3 files reviewed, 17 unresolved discussions (waiting on @mwalkiewicz and @prawilny)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/mirroringmetrics/MirroringSpanConstants.java, line 96 at r3 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Is it used anywhere?
yes, when closing the mirroring buffered muttator
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 40 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Generally looks OK.
I just want to describe a alternative implementation that might have some performance improvements.
Instead of requesting resources from the FlowController for each Mutation separately, we might wait for all mutations scheduled in a singlemutate(List<Mutation>)call and acquire them all at once.
The flow would look like this:Future<Void> all = waitForAll(primary(list)); all.thenRun(() -> { select successful mutations; request flow control resources }) .then(() -> { schedule secondary; })
- exception handling.
I think that it would be less readable than the current implementation, but we'd have slightly less overhead.
I do not think that is it worth to make this change, but I'm describing it for completeness.
I agree. 😄 The only benefit would be with requesting the resources which does not seem to be worth the mess.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 68 at r1 (raw file):
Previously, kboroszko (Kajetan Boroszko) wrote…
I feel like the type of a variable can affect how you should interpret it just as much as it's name. So if you have a future that is called
resourceAcquired, it means that when it completes, you have acquired it.This way you can later write resourceAcquired.thenRun().
But I don't mind changing it. Besides, I am outnumbered...
After giving it some thought, I stand by what I said.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 74 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
It'd*
GR8
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 87 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
tip: if you throw an exception in some kind of then/when/exceptionally it's automatically converted into a CompletableFuture that is completed exceptionally with this exception.
Note that it has to be an unchecked exception (or else the compiler complains) - usually Throwable is wrapped in CompletionException
ok.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 114 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I think that buffering will take care of sending bigger requests :)
The intuition is correct, we'd rather make a single big request than several small ones, but I still have to think if we will have any significant performance improvement if we changed current implementation to a one that handles several writes at once. It will reduce flowController overhead, that is for sure, but it'd be the only thing and it might reduce readability.
ok.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 146 at r4 (raw file):
MIRRORING_BUFFERED_MUTATOR_CLOSE
@mwalkiewicz here
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java, line 131 at r1 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
You could move all of reused setup to setUp() [which is called before every test case].
If there's a problem with "unused X", just see howlenient()is used.
Done.
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r3, 2 of 2 files at r5, all commit messages.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @kboroszko and @prawilny)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/mirroringmetrics/MirroringSpanConstants.java, line 96 at r3 (raw file):
Previously, kboroszko (Kajetan Boroszko) wrote…
yes, when closing the mirroring buffered muttator
ok
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 146 at r4 (raw file):
Previously, kboroszko (Kajetan Boroszko) wrote…
MIRRORING_BUFFERED_MUTATOR_CLOSE@mwalkiewicz here
Oh, ok, somehow I missed it. I would remove the scopes now, and we will add it later in a separate PR for every operation.
kboroszko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 1 of 3 files reviewed, 15 unresolved discussions (waiting on @mwalkiewicz and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java, line 146 at r4 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Oh, ok, somehow I missed it. I would remove the scopes now, and we will add it later in a separate PR for every operation.
removed
a418795 to
0faa193
Compare
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 2 files at r6, 1 of 1 files at r7, all commit messages.
Reviewable status: all files reviewed, 15 unresolved discussions (waiting on @mwalkiewicz and @prawilny)
… review comments (googleapis#3347) * chore: revert review comments * feat: add MirroringOperationException exception markers (#125) * feat: concurrent writes in MirroringBufferedMutator (#80) * refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75) * feat: implement MirroringAsyncTable#getName() (#132) * feat: use Logger rather than stdout in DefaultMismatchDetector (#128) * feat: synchronous writes (#88) * fix: implement heapSize method for RowCell (#111) * feat: FlowController accounts for memory usage (#137) * refactor: remove Configuration as a base of MirroringConfiguration (#127) * feat: MirroringAsyncBufferedMutator (#81) * refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138) * fix: BufferedMutator close() waits for all secondary flushes to finish (#110) * feat: 2.x reads sampling (#114) * refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134) * ConcurrentBufferedMutator integration tests (#135) * feat: add synchronous MirroringConnection to 2.x (#109) * fix: MirroringConnection in 2.x failed to compile (#139) * fix: fix BufferedMutator ITs (#140) * feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108) * feat: 2.x - rewrite Increment and Append as Put in batch (#116) * fix: fix build (#142) * refactor: minor fixes after review (#117) * feat: MirroringAsyncTable#getScanner() (#58) * test: 2.x integration tests (#112) * feat: implement MirroringAsyncBufferedMutatorBuilder (#144) * feat: log rows and values in DefaultMismatchDetector (#129) * fix: ITs - add expected parameter to MismatchDetectors (#153) * fix: force Append and Increment to return results and discard that result before returning it to user (#136) * fix: review fixes in utils * fix: review fixes in BufferedMutator * fix: review fixes in Faillog * fix: fixed reference counting * fix: review fixes in FlowController * fix: review fixes in metrics * fix: review fixes in verification * fix: Review fixes in MirroringTable * fix: review fixes in HBase 2.x client * fix: fixes in ITs * feat: MirrorinAsyncTable: scan(), scanAll() (#131) * fix: review fixes in tests * feat: MirroringConnection: timeout in close() and abort() (#133) * feat: better mismatch detection of scan results (#130) * feat: quickstart (#105) * fix: 2.x scan ITs (#158) * fix: DefaultMismatchDetector tests (#157) * fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161) * fix: additional minor fixes after review (#163) * fix: BufferedMutator review fixes (#164) - Simplify #flush(). - Add javadocs. - (sequential) Fix flush() exception handling. - (sequential) Move error handling to a separate inner class. * fix: PR fixes * fix: report zeroed error metrics after successful operations * fix: prepend MismatchDetectorCounter with Test to better reflect its purpose * feat: Client-side timestamping (#165) * fix: reduce timeout in TestBlocking to make the tests run faster * fix: asyncClose -> closePrimaryAndScheduleSecondaryClose * fix: remove unused Batcher#throwBatchDataExceptionIfPresent * fix: remove unused Comparators#compareRows * fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion * feat: remove unused MirroringTracer from FailedMutationLogger * fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer * fix: TestMirroringAsyncTableInputModification typo fix * fix: describe user flush() in Buffered Mutator in quickstart * fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer * refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception> * BufferedMutator - add close timeout * AsyncBufferedMutator - add close timeout * fix: remove stale addSecondaryMutation comment * fix: add a comment that addSecondaryMutation handles failed writes * fix: unify implementations of flushBufferedMutatorBeforeClosing * fix: BufferedMutator - throw exceptions on close * fix: BufferedMutator - add comment explaining that chain of flush operations is created * fix: BufferedMutator - clarify comments * fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable * fix: explain why flush is called in Sequential BufferedMutator test * fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit * refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush() * refactor: make FlushSerializer non-static * fix: BufferedMutator - use HierarchicalReferenceCounter * feat: Add MirroringConnection constructor taking MirroringConfiguration * refactor: move releaseReservations to finally * fix: use less convoluted example in lastFlushFutures description * fix: merge small Timeestamper files into a single file * fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator * fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered * fix: add comment explaining why batch is complicated * fix: add a TODO to implement point writes without batch Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com> Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
… review comments (googleapis#3347) * chore: revert review comments * feat: add MirroringOperationException exception markers (#125) * feat: concurrent writes in MirroringBufferedMutator (#80) * refactor: implement multiple argument operations on MirroringAsyncTable with specific operations rather than batch() (#75) * feat: implement MirroringAsyncTable#getName() (#132) * feat: use Logger rather than stdout in DefaultMismatchDetector (#128) * feat: synchronous writes (#88) * fix: implement heapSize method for RowCell (#111) * feat: FlowController accounts for memory usage (#137) * refactor: remove Configuration as a base of MirroringConfiguration (#127) * feat: MirroringAsyncBufferedMutator (#81) * refactor: rename WRITE_MISMATCH to SECONDARY_WRITE_ERROR (#138) * fix: BufferedMutator close() waits for all secondary flushes to finish (#110) * feat: 2.x reads sampling (#114) * refactor: make MirroringResultScanner synchronize on itself rather than MirroringTable (#134) * ConcurrentBufferedMutator integration tests (#135) * feat: add synchronous MirroringConnection to 2.x (#109) * fix: MirroringConnection in 2.x failed to compile (#139) * fix: fix BufferedMutator ITs (#140) * feat: run 1.x integration tests on MirroringConnection etc. from 2.x (#108) * feat: 2.x - rewrite Increment and Append as Put in batch (#116) * fix: fix build (#142) * refactor: minor fixes after review (#117) * feat: MirroringAsyncTable#getScanner() (#58) * test: 2.x integration tests (#112) * feat: implement MirroringAsyncBufferedMutatorBuilder (#144) * feat: log rows and values in DefaultMismatchDetector (#129) * fix: ITs - add expected parameter to MismatchDetectors (#153) * fix: force Append and Increment to return results and discard that result before returning it to user (#136) * fix: review fixes in utils * fix: review fixes in BufferedMutator * fix: review fixes in Faillog * fix: fixed reference counting * fix: review fixes in FlowController * fix: review fixes in metrics * fix: review fixes in verification * fix: Review fixes in MirroringTable * fix: review fixes in HBase 2.x client * fix: fixes in ITs * feat: MirrorinAsyncTable: scan(), scanAll() (#131) * fix: review fixes in tests * feat: MirroringConnection: timeout in close() and abort() (#133) * feat: better mismatch detection of scan results (#130) * feat: quickstart (#105) * fix: 2.x scan ITs (#158) * fix: DefaultMismatchDetector tests (#157) * fix: ConcurrentBufferedMutator waits for both flushes to finish before closing (#161) * fix: additional minor fixes after review (#163) * fix: BufferedMutator review fixes (#164) - Simplify #flush(). - Add javadocs. - (sequential) Fix flush() exception handling. - (sequential) Move error handling to a separate inner class. * fix: PR fixes * fix: report zeroed error metrics after successful operations * fix: prepend MismatchDetectorCounter with Test to better reflect its purpose * feat: Client-side timestamping (#165) * fix: reduce timeout in TestBlocking to make the tests run faster * fix: asyncClose -> closePrimaryAndScheduleSecondaryClose * fix: remove unused Batcher#throwBatchDataExceptionIfPresent * fix: remove unused Comparators#compareRows * fix: extract failedReads from MatchingSuccessfulReadsResults to reduce confusion * feat: remove unused MirroringTracer from FailedMutationLogger * fix: MirroringAsyncBufferedMutator - test if failed mutation is passed to secondary write error consumer * fix: TestMirroringAsyncTableInputModification typo fix * fix: describe user flush() in Buffered Mutator in quickstart * fix: MirroringBufferedMutator - move flush threshold from BufferedMutations to FlushSerializer * refactor: MirroringBufferedMutator#close() - use AccumulatedExceptions insteand of List<Exception> * BufferedMutator - add close timeout * AsyncBufferedMutator - add close timeout * fix: remove stale addSecondaryMutation comment * fix: add a comment that addSecondaryMutation handles failed writes * fix: unify implementations of flushBufferedMutatorBeforeClosing * fix: BufferedMutator - throw exceptions on close * fix: BufferedMutator - add comment explaining that chain of flush operations is created * fix: BufferedMutator - clarify comments * fix: Concurrent BufferedMutator - fix throwFlushExceptionIfAvailable * fix: explain why flush is called in Sequential BufferedMutator test * fix: TestConcurrentMirroringBufferedMutator - make waiting for calls explicit * refactor: BufferedMutator rename scheduleFlushAll() to scheduleFlush() * refactor: make FlushSerializer non-static * fix: BufferedMutator - use HierarchicalReferenceCounter * feat: Add MirroringConnection constructor taking MirroringConfiguration * refactor: move releaseReservations to finally * fix: use less convoluted example in lastFlushFutures description * fix: merge small Timeestamper files into a single file * fix: add a comment explaining which exceptions are forwarded to the user and why in SequentialMirroringBufferedMutator * fix: use UnsupportedOperationException instead of RuntimeException when forbidden mutation type is encountered * fix: add comment explaining why batch is complicated * fix: add a TODO to implement point writes without batch Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com> Co-authored-by: Kajetan Boroszko <kajetan@unoperate.com>
Introducing MirroringAsyncBufferedMutator.
Mutate returns a future that gets completed only when the write to primary has completed and resources have been allocated for the secondary write.
This change is