-
Notifications
You must be signed in to change notification settings - Fork 0
feat: MirroringAsyncTable#Batch() #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 8 unresolved discussions (waiting on @dopiera and @prawilny)
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/BatchHelpers.java, line 34 at r1 (raw file):
final MismatchDetector mismatchDetector, final SecondaryWriteErrorConsumer secondaryWriteErrorConsumer) { return createBatchVerificationCallback(
I'd move changes in this files to a separate PR.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/BatchHelpers.java, line 39 at r1 (raw file):
mismatchDetector, secondaryWriteErrorConsumer, new Predicate<Object>() {
Would it be possible to pass this parameter explicitly in every place where it is used? maybe it's just too late for me, but I cannot conclude that this default value will always be correct, especially considering the fact that default predicate for SplitBatchResponse is different that this one.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 156 at r1 (raw file):
@Override public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
Logic in this method is complicated and hard to follow. I think that it would be worth to split it into smaller parts. My favorite strategy of doing this is to walk through the code, add a comment above each 'portion' of code describing what it does, and then replacing those portions and corresponding comments with descriptively named methods.
Also, it might be worth to try replacing lambdas with non-inline methods or with named inline functions.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 158 at r1 (raw file):
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) { final int numberOfActions = actions.size(); final ArrayList<CompletableFuture<T>> resultFutures =
The idiom in Java is to use List<T> x = new ArrayList<>().
Also, you can try to use Arrays.setAll(x, i -> new CompletableFuture<T>()); to fill the array instead of doing it manually.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 169 at r1 (raw file):
batchHelperWaitForAll( primaryFutures, (idx, throwable) -> {
Move this lambda to local variable and give it a some nice name (primaryErrorHandler?), it's hard to say what is its purpose now.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 203 at r1 (raw file):
new Object[primarySplitResponse.allSuccessfulOperations.size()]; reserveFlowControlResourcesThenScheduleSecondary(
I do not see body of reserveFlowControlResourcesThenScheduleSecondary in this PR, but maybe we can extract scheduleSecondary part out of it and not hack the former with passing completed futures?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 226 at r1 (raw file):
} private <T> CompletableFuture<Void> batchHelperWaitForAll(
This method accepts a flag that alters its behavior, what makes this code a bit convoluted. It can be simplified by moving the alternative behavior from this method into a separate wrapper method that will interpret results returned by this method. In code:
<T> CompletableFuture<Object[]> batchHelperWaitForAll(List<...> futures, Consumer<...> errorHandler) {
...
}
Object[] will be created inside this method and returned as a future.
We will accept the result in this form when handling results of primary operation.
We can use function similar to that below to handle the alternative behavior used when waiting for secondary batch results.
CompletableFuture<Void> getFutureFailingOnAnyBatchOperatoinFailure(CompletableFuture<Object[]> f) {
CompletableFuture<Void> result = new ...;
f.handle((result) -> {
if (noError(result)) {
result.complete(null);
} else {
result.fail(...)
}
});
return result;
}
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 228 at r1 (raw file):
private <T> CompletableFuture<Void> batchHelperWaitForAll( List<CompletableFuture<T>> futures, BiFunction<Integer, Throwable, Void> errorHandler,
Maybe you can use BiConsumer here?
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 4 files reviewed, 8 unresolved discussions (waiting on @dopiera and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 226 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
This method accepts a flag that alters its behavior, what makes this code a bit convoluted. It can be simplified by moving the alternative behavior from this method into a separate wrapper method that will interpret results returned by this method. In code:
<T> CompletableFuture<Object[]> batchHelperWaitForAll(List<...> futures, Consumer<...> errorHandler) { ... }
Object[]will be created inside this method and returned as a future.
We will accept the result in this form when handling results of primary operation.We can use function similar to that below to handle the alternative behavior used when waiting for secondary batch results.
CompletableFuture<Void> getFutureFailingOnAnyBatchOperatoinFailure(CompletableFuture<Object[]> f) { CompletableFuture<Void> result = new ...; f.handle((result) -> { if (noError(result)) { result.complete(null); } else { result.fail(...) } }); return result; }
Or maybe it would be better to wrap BatchHelpers.createBatchVerificationCallback?
bfa551b to
3ee3d91
Compare
62375da to
6a0e262
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dismissed @mwalkiewicz from a discussion.
Reviewable status: 0 of 6 files reviewed, 7 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 158 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
The idiom in Java is to use
List<T> x = new ArrayList<>().
Also, you can try to useArrays.setAll(x, i -> new CompletableFuture<T>());to fill the array instead of doing it manually.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 169 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Move this lambda to local variable and give it a some nice name (
primaryErrorHandler?), it's hard to say what is its purpose now.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 203 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I do not see body of
reserveFlowControlResourcesThenScheduleSecondaryin this PR, but maybe we can extractscheduleSecondarypart out of it and not hack the former with passing completed futures?
Well, we do have scheduleVerificationAfterSecondaryOperation. I still don't think it's that bad - if you find yourself having a reservation but not its future, you can always call CompletableFuture.completedFuture(reservation).
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 226 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Or maybe it would be better to wrap
BatchHelpers.createBatchVerificationCallback?
Done. Previous version was stupid. When the error is to be ignored, we just use whenComplete.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 228 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Maybe you can use
BiConsumerhere?
Done.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/BatchHelpers.java, line 34 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
I'd move changes in this files to a separate PR.
Done.
bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/BatchHelpers.java, line 39 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Would it be possible to pass this parameter explicitly in every place where it is used? maybe it's just too late for me, but I cannot conclude that this default value will always be correct, especially considering the fact that default predicate for SplitBatchResponse is different that this one.
Done, I copied 1:1 rather than sensibly. Now it's (o) -> o == null || o instanceof Throwable in MirroringTable and (o) - > o instanceof Throwable in MirroringAsyncTable (both classes have static Predicate member).
|
Possibly I messed up a little bit - I published the review and only then pushed local changes. |
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 7 unresolved discussions (waiting on @dopiera and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 156 at r1 (raw file):
Previously, mwalkiewicz (Mateusz Walkiewicz) wrote…
Logic in this method is complicated and hard to follow. I think that it would be worth to split it into smaller parts. My favorite strategy of doing this is to walk through the code, add a comment above each 'portion' of code describing what it does, and then replacing those portions and corresponding comments with descriptively named methods.
Also, it might be worth to try replacing lambdas with non-inline methods or with named inline functions.
Done.
I still feel like it's still quite complicated but have no idea how to simplify this any further.
dopiera
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 15 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 150 at r6 (raw file):
} @Override
Why did you change the location in file of these methods?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 175 at r6 (raw file):
final List<CompletableFuture<T>> primaryFutures = this.primaryTable.batch(actions); // Unfortunately, we cannot create T[]. final Object[] primaryResults = new Object[numberOfActions];
Why not return primaryResults from the future returned from waitForAllWithErrorHandler?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 205 at r6 (raw file):
verificationCreator
The function's name already says (or should say) what it does, so I think invoking it directly where you need it will increase readability (vs having the local variable).
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 226 at r6 (raw file):
} private Function<Void, FutureCallback<Void>> createBatchVerificationCallbackWrapper(
I think the function name would be better if it followed the variable you're assigning its result to, i.e. createBatchVerificationCreator().
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 237 at r6 (raw file):
} private Supplier<CompletableFuture<Void>> createSecondaryBatchFuturesCompletedFutureSupplier(
I think inlining this function would improve readability. IMO its name is harder to comprehend than what it actually it does.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 247 at r6 (raw file):
completeResultFutures
Maybe completeSuccessfulResultFutures to indicate that failed ones are handled elsewhere?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 257 at r6 (raw file):
// Waits for all of the futures from the list to complete and then returns an exceptional future // if and only if any of them completed exceptionally. private <T> CompletableFuture<Void> waitForAllWithErrorHandler(
I think this can be implemented via ListenableCounter, which could make it a bit more readable, potentially.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 307 at r6 (raw file):
return; } CompletableFuture<FlowController.ResourceReservation> reservationFuture =
In the other method in this file you've inlined this - I think the inlined version is more readable.
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented changes as suggested.
Reviewable status: 0 of 6 files reviewed, 15 unresolved discussions (waiting on @dopiera and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 150 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
Why did you change the location in file of these methods?
I want to leave unimplemented methods at the bottom and I thought that batch()-like methods should be near each other. Though they might fit better below single operation helpers.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 175 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
Why not return
primaryResultsfrom the future returned fromwaitForAllWithErrorHandler?
It's also used in createSecondaryBatchFuturesCompletedFutureSupplier and there we rely on the fact that this callback (createSecondary...) writes into an array that is also passed to verifier. This approach allows us to reuse most of 1.x batch()'s code.
Maybe there is a way to simplify this, but that's not it, I think (or at least not without writing some adapter for code we want to reuse).
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 205 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
verificationCreatorThe function's name already says (or should say) what it does, so I think invoking it directly where you need it will increase readability (vs having the local variable).
Done. Previously was extracted into a method in order to shrink the sequence of multiline declarations a bit.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 226 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
I think the function name would be better if it followed the variable you're assigning its result to, i.e.
createBatchVerificationCreator().
Removed the function as suggested above.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 237 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
I think inlining this function would improve readability. IMO its name is harder to comprehend than what it actually it does.
Done
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 247 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
completeResultFuturesMaybe
completeSuccessfulResultFuturesto indicate that failed ones are handled elsewhere?
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 257 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
I think this can be implemented via
ListenableCounter, which could make it a bit more readable, potentially.
Noticed a bug in the current code and a problem with the ReferenceCounter implementation: if the list of requests is empty (it's stupid but possible, I think), in the current implementation we never complete the future and in the ReferenceCounter implementation there's a problem with initializing the counter (and with using the class not really as its should be used).
Rewritten and simplified the code a bit using AtomicReference (I wanted to use non-atomic reference, but variables used by lambdas have to be final).
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 307 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
In the other method in this file you've inlined this - I think the inlined version is more readable.
Done.
107e110 to
3318906
Compare
|
Rebased onto new master - had to adjust secondaryWriteErrorConsumer (and fix a bug from master introduced by a rebase (in BatchHelpers)). |
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 4 files at r8, all commit messages.
Reviewable status: 2 of 6 files reviewed, 9 unresolved discussions (waiting on @dopiera and @mwalkiewicz)
3318906 to
c0aad29
Compare
c97bdb1 to
1bb4de2
Compare
c0aad29 to
c21c3db
Compare
dopiera
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 6 unresolved discussions (waiting on @dopiera, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 257 at r6 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
Noticed a bug in the current code and a problem with the ReferenceCounter implementation: if the list of requests is empty (it's stupid but possible, I think), in the current implementation we never complete the future and in the ReferenceCounter implementation there's a problem with initializing the counter (and with using the class not really as its should be used).
Rewritten and simplified the code a bit using AtomicReference (I wanted to use non-atomic reference, but variables used by lambdas have to be final).
Here's another idea:
for (int i = 0; i < futures.size(); ++i) {
future.handleAsync((ignoredResult, error) -> errorHandler(i, error));
}
return CompletableFuture.allOf(futures);
WDYT?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 180 at r9 (raw file):
numberOfActions
I think these are called operations, not actions. Also, I think it is unnecessarily verbose - numOperations would be just as good, but shorter.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 181 at r9 (raw file):
resultFuturesArray
Why do we need this intermediate construction variable?
new ArrayList(new CompletableFuture[numOperations]);
Collections.setAll()
That should do the job, no?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 213 at r9 (raw file):
this.flowController.asyncRequestResource( writeOperationInfo.requestResourcesDescription)); final Object[] secondaryResults =
This may have a considerable size. Should we allocate it in the resourceReservationRequest's continuation?
1bb4de2 to
6922c33
Compare
c21c3db to
76e011b
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 6 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 257 at r6 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
Here's another idea:
for (int i = 0; i < futures.size(); ++i) { future.handleAsync((ignoredResult, error) -> errorHandler(i, error)); } return CompletableFuture.allOf(futures);WDYT?
Unfortunately, Java 8 doesn't have (and guava's are annotated with "@beta") utilities for zipping streams nor a function which could be named mapWithIndex (and mapping a function to a list can be also done via a stream).
Additionally, this version doesn't adhere to the function's contract - it doesn't fill results appropriately.
The other problem is with the fact that it returns too early if some function errs or doesn't know about the exception (though that may be helped by having a boolean flag and returning some generic exception).
Final version (with poorly named variables) adressing these problems is not better than the code IMO:
private <T> CompletableFuture<Void> waitForAllWithErrorHandler(
List<CompletableFuture<T>> futures,
BiConsumer<Integer, Throwable> errorHandler,
Object[] results) {
int numFutures = futures.size();
AtomicReference maybeThrowable = new AtomicReference(null);
List<CompletableFuture<Void>> futs = new ArrayList<>();
for (int i = 0; i < numFutures; i++) {
final int futureIdx = i;
futs.add(
futures
.get(futureIdx)
.handle(
(result, error) -> {
if (error != null) {
results[futureIdx] = error;
maybeThrowable.compareAndSet(null, error);
errorHandler.accept(futureIdx, error);
} else {
results[futureIdx] = result;
}
return null;
}));
}
return CompletableFuture.allOf(futs.toArray(new CompletableFuture[0]))
.handle(
(ignoredResult, ignoredError) -> {
if (maybeThrowable.get() != null) {
throw new UncheckedExecutionException((Throwable) maybeThrowable.get());
} else {
return null;
}
});
}
WDYT?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 180 at r9 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
numberOfActionsI think these are called operations, not actions. Also, I think it is unnecessarily verbose -
numOperationswould be just as good, but shorter.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 181 at r9 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
resultFuturesArrayWhy do we need this intermediate construction variable?
new ArrayList(new CompletableFuture[numOperations]); Collections.setAll()That should do the job, no?
There seem to be no method to set all elements of a list nor a constructor from array. I used Stream API - seems to be a bit cleaner.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 213 at r9 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
This may have a considerable size. Should we allocate it in the
resourceReservationRequest's continuation?
Only in this place do we know the size of the array we need to create.
Also there are some assertions about size of the array (in BatchHelpers) and there seems to be no way to shorten an Array.
Ah, there are also a lot of allocations while splitting the results.
d930fa1 to
4955664
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 6 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 257 at r6 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
Unfortunately, Java 8 doesn't have (and guava's are annotated with "@beta") utilities for zipping streams nor a function which could be named
mapWithIndex(and mapping a function to a list can be also done via a stream).Additionally, this version doesn't adhere to the function's contract - it doesn't fill results appropriately.
The other problem is with the fact that it returns too early if some function errs or doesn't know about the exception (though that may be helped by having a boolean flag and returning some generic exception).Final version (with poorly named variables) adressing these problems is not better than the code IMO:
private <T> CompletableFuture<Void> waitForAllWithErrorHandler( List<CompletableFuture<T>> futures, BiConsumer<Integer, Throwable> errorHandler, Object[] results) { int numFutures = futures.size(); AtomicReference maybeThrowable = new AtomicReference(null); List<CompletableFuture<Void>> futs = new ArrayList<>(); for (int i = 0; i < numFutures; i++) { final int futureIdx = i; futs.add( futures .get(futureIdx) .handle( (result, error) -> { if (error != null) { results[futureIdx] = error; maybeThrowable.compareAndSet(null, error); errorHandler.accept(futureIdx, error); } else { results[futureIdx] = result; } return null; })); } return CompletableFuture.allOf(futs.toArray(new CompletableFuture[0])) .handle( (ignoredResult, ignoredError) -> { if (maybeThrowable.get() != null) { throw new UncheckedExecutionException((Throwable) maybeThrowable.get()); } else { return null; } }); }WDYT?
Done.
Note wrapping the rethrown exception into a CompletionException - I cannot force handle() to return a checked exception. On the other hand, it's not really a problem - allOf wraps inner exception into CompletionException if it isn't already one, so the result stays the same (only a few lines in a test had to be changed).
dopiera
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 7 unresolved discussions (waiting on @beta, @dopiera, @mwalkiewicz, and @prawilny)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 180 at r10 (raw file):
Stream.generate(() -> new CompletableFuture<T>()) .limit(numActions) .collect(Collectors.toList());
I think you want Collectors.toCollection(ArrayList::new) - otherwise you have no guarantee that .get(idx), which you're using below is constant time.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 256 at r10 (raw file):
handledFutures
Why create a separate list rather than reuse the existing one?
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 258 at r10 (raw file):
futureIdx
What's wrong with i? ;) Even if you prefer futureIdx, why not do for (int futureIdx = 0; futureIdx < numFutures; ++futureIdx)?
2266aa2 to
b542429
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 7 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 180 at r10 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
I think you want
Collectors.toCollection(ArrayList::new)- otherwise you have no guarantee that.get(idx), which you're using below is constant time.
Done.
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 256 at r10 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
handledFuturesWhy create a separate list rather than reuse the existing one?
We can allow allOf() to return only after we've already filled results[] so we need new futures. Otherwise allOf() races with then()/handle().
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 258 at r10 (raw file):
Previously, dopiera (Marek Dopiera) wrote…
futureIdxWhat's wrong with
i? ;) Even if you preferfutureIdx, why not dofor (int futureIdx = 0; futureIdx < numFutures; ++futureIdx)?
We need a final variable to use in lambda.
ee241f0 to
b1f3845
Compare
prawilny
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 7 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java, line 256 at r10 (raw file):
Previously, prawilny (Adam Czajkowski) wrote…
We can allow allOf() to return only after we've already filled results[] so we need new futures. Otherwise allOf() races with then()/handle().
Done
dopiera
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 3 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
This reverts commit 0327f5f.
dopiera
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: 0 of 6 files reviewed, 2 unresolved discussions (waiting on @beta, @dopiera, and @mwalkiewicz)
mwalkiewicz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 3 files at r10, 4 of 5 files at r12, 1 of 1 files at r13, all commit messages.
Reviewable status: all files reviewed, 1 unresolved discussion (waiting on @beta)
* Initial implementation of hbase1.x Mirroring Client (#1) * Initial implementation of hbase1.x Mirroring Client * MirroringTable and AsyncTableWrapper (#4) * MirroringTable and AsyncTableWrapper - Initial implementation of MirroringTable. - Wrapper around HBase 1.x Table enabling scheduling asynchronous operations. - Asynchronous verification of get/exists results. * Add missing licence header to TestResultComparator (#6) * Setup running tests in GitHub Actions (#7) * Asynchronous ResultScanner (#5) * Asynchronous ResultScanner * Implement faillog.Appender (#2) `faillog.Appender` is the lowermost component of the infrastructure for dumping failed mutations in the `MirroringClient`. `faillog/README.md` explains the design decisions in a bit more detail. `faillog.Appender` essentially logs arbitrary data asynchronously, through a separate thread reading from a bounded buffer. * ListenableReferenceCounter (#8) * Count references in asynchronous tasks before closing Table/Scanner. * Flow controller (#10) * Flow Controller * Flow Control strategy based on single requests queue * Mirroring table: writes (#12) * Add missing condition in result comparator (#15) * Add more mirroring config opitons (#16) * MismatchDetector implementation. * FlowControllerStrategy implementation. * Maximal number of outstanding requests used by FlowControllerStrategy. * Primary/Secondary connection implementation option now accepts "default" value which can be used when default HBase Connection implementation should be used by MirroringConnection. * Make AsyncTableWrapper ListenableCloseable (#20) Implements our standard interface for objects that can run callbacks after asynchronous close to simplify reference counting of MirroringTable. * MirroringConnection: use new config options (#17) * MirroringResultScanner improvements (#18) * Count references to MirroringResultScanner Current implementation of MirroringResultScanner doesn't count verificaiton requests that it has scheduled and allows to close instances while verificaiton requests are in-flight. This causes lost verifications. This PR fixes this issue by counting references to MirroringResultScanner instances when scheduling verification requests. Moreover, ListenableClosable interface is implemented for consistency with other classes that use this scheme, because now the MirroringResultScanner instances will be closed asynchronously, when all scheduled requests have finished. * MirroringTable: count references to scanners and secondary wrapper (#21) Current MirroringTable implementaion does not count its references held by MirroringResultScanners and SecondaryAsyncWrapper, thus MirroringConnection consideres it closed before we are sure that all asynchronous operations have completed. This PR adds correct reference counting of MirroringTable based on work done in previously merged PRs. * Result Scanner - ensure verification ordering (#22) Current implementation assumes that next() operations on primary and secondary scanners are called in the same order and uses this assumption to match results for verification. However, next()s on secondary database are called asynchronusly and their order is not defined, which causes invalid mismatch reports. This PR fixes this problem by placing data to be verified - results of next()s called on primary scanner - and details of next() call - number of requested elements - call on a queue. Each asynchronous call to next() is synchonized and pops a single element from that queue. Appropriate next is called based on number of requested elements. Then results of that request and results from the queue are verified. This ensures that results of next()s passed to verification are correctly matched and ordered. * Integration tests (#23) HBase 1.x integration tests * Add trace logging. (#24) * Estimate memory overhead in RequestResourceDescription (#25) * Tests: extract executor service to TestRule (#26) Extract executor service utilities into TestRule to facilitate code reuse in other test classes. * Integration tests: read configuration from xml files * MirroringBufferedMutator * MirroringBufferedMutator: integration tests (#9) * Fix error introduced in rebase (#11) * Obtain resource reservation before scheduling secondary calls (#4) Fixes a bug when secondary database request Future was created before obtaining resources from FlowController. * Integration tests - MirroringTable operations (#10) * MirroringAsyncConfiguration (#5) Add configuration class to be used by MirroringAsyncConnection. * SecondaryWriteErrorConsumer in MirroringTable (#15) Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes. * Use Put to implement Increment and Append (#16) * refactor: extract functions using reflection into package utils.reflection * refactor: extract BatchHelpers into utils Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client. * feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable. * refactor: extract FlowController's request cancellation into a method * fix: Increment ITs fail with Bigtable as primary (#21) We were setting timerange on Increment objects used in integration tests without any reason and Bigtable doesn't support this operation. Setting timerange in ITs was removed. * fix: RequestScheduling should handle rejected resource reservations (#24) Custom FlowControlerStrategy implementations might, contrary to the default implementation, resolve reservation requests with exception, what we should handle by not performing the action that had to acquire the resources. * feat: Add OpenCensus tracing and metrics. (#14) * fix: make BatchHelpers skip verification of empty read results BatchHelpers provides error handling of batch() when there may be some partial results. Before the commit, matching successful reads were redundantly verified if there were none of them. This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays. * refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API. * refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation This change is split off from commit introducing MirroringAsyncTable#batch() * feat: implement batch() in MirroringAsyncTable Implementation of MirroringAsyncTable's batch() and MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it. * feat: implement failed mutations log (#19) Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually. Each failure is logged as a separate line, which makes it compatible with solutions like logstash. * refactor: split SplitBatchResponse (#40) SplitBatchResponse was refactored into two parts: splitting into reads/writes and failed/successful. This makes the code simpler and easier to maintain. * refactor: extract helper methods in tests (#48) * refactor: remove redundant writeWithControlFlow argument * feat: copy HBase operations' input lists (#57) * refactor: remove redundant field from MirroringConnection (#55) * feat: verification with sampling (#28) * fix: mirror Increment/Append in batch() using Put. (#47) * refactor: Move HBaseOperation into WriteOperationInfo (#68) * refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69) * fix: integration tests - fix build (#70) * fix: count references to batch operations (#63) * fix: close underlying connections when MirroringConnection is closed (#49) * refactor: fix IDE warnings in MirroringAsyncTable test (#64) * fix: integration tests - check if write errors were reported (#71) * feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65) * fix: do not call callbacks with lock held (#53) * refactor: use AccumulatedExceptions where appropriate (#54) * fix: fix key used in verification sampling ITs (#77) * feat: use faillog for handling write errors (#66) * refactor: add utilities for Futures (FutureUtils) * feat: defer closing connections until async operations complete (#37) Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations. * feat: implement AsyncTableBuilder (#42) * feat: implement MirroringAsyncTable#checkAndMutate (#43) * fix: Implement single Append and Increment as Put (#38) * refactor: simplify SecondaryWriteErrorConsumer API (#78) * feat: concurrent writes in MirroringTable (#79) * test: fix failing concurrent write test (#120) * refactor: renames and moves in RequestScheduling (#87) * wip: handover session comments Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
* Initial implementation of hbase1.x Mirroring Client (#1) * Initial implementation of hbase1.x Mirroring Client * MirroringTable and AsyncTableWrapper (#4) * MirroringTable and AsyncTableWrapper - Initial implementation of MirroringTable. - Wrapper around HBase 1.x Table enabling scheduling asynchronous operations. - Asynchronous verification of get/exists results. * Add missing licence header to TestResultComparator (#6) * Setup running tests in GitHub Actions (#7) * Asynchronous ResultScanner (#5) * Asynchronous ResultScanner * Implement faillog.Appender (#2) `faillog.Appender` is the lowermost component of the infrastructure for dumping failed mutations in the `MirroringClient`. `faillog/README.md` explains the design decisions in a bit more detail. `faillog.Appender` essentially logs arbitrary data asynchronously, through a separate thread reading from a bounded buffer. * ListenableReferenceCounter (#8) * Count references in asynchronous tasks before closing Table/Scanner. * Flow controller (#10) * Flow Controller * Flow Control strategy based on single requests queue * Mirroring table: writes (#12) * Add missing condition in result comparator (#15) * Add more mirroring config opitons (#16) * MismatchDetector implementation. * FlowControllerStrategy implementation. * Maximal number of outstanding requests used by FlowControllerStrategy. * Primary/Secondary connection implementation option now accepts "default" value which can be used when default HBase Connection implementation should be used by MirroringConnection. * Make AsyncTableWrapper ListenableCloseable (#20) Implements our standard interface for objects that can run callbacks after asynchronous close to simplify reference counting of MirroringTable. * MirroringConnection: use new config options (#17) * MirroringResultScanner improvements (#18) * Count references to MirroringResultScanner Current implementation of MirroringResultScanner doesn't count verificaiton requests that it has scheduled and allows to close instances while verificaiton requests are in-flight. This causes lost verifications. This PR fixes this issue by counting references to MirroringResultScanner instances when scheduling verification requests. Moreover, ListenableClosable interface is implemented for consistency with other classes that use this scheme, because now the MirroringResultScanner instances will be closed asynchronously, when all scheduled requests have finished. * MirroringTable: count references to scanners and secondary wrapper (#21) Current MirroringTable implementaion does not count its references held by MirroringResultScanners and SecondaryAsyncWrapper, thus MirroringConnection consideres it closed before we are sure that all asynchronous operations have completed. This PR adds correct reference counting of MirroringTable based on work done in previously merged PRs. * Result Scanner - ensure verification ordering (#22) Current implementation assumes that next() operations on primary and secondary scanners are called in the same order and uses this assumption to match results for verification. However, next()s on secondary database are called asynchronusly and their order is not defined, which causes invalid mismatch reports. This PR fixes this problem by placing data to be verified - results of next()s called on primary scanner - and details of next() call - number of requested elements - call on a queue. Each asynchronous call to next() is synchonized and pops a single element from that queue. Appropriate next is called based on number of requested elements. Then results of that request and results from the queue are verified. This ensures that results of next()s passed to verification are correctly matched and ordered. * Integration tests (#23) HBase 1.x integration tests * Add trace logging. (#24) * Estimate memory overhead in RequestResourceDescription (#25) * Tests: extract executor service to TestRule (#26) Extract executor service utilities into TestRule to facilitate code reuse in other test classes. * Integration tests: read configuration from xml files * MirroringBufferedMutator * MirroringBufferedMutator: integration tests (#9) * Fix error introduced in rebase (#11) * Obtain resource reservation before scheduling secondary calls (#4) Fixes a bug when secondary database request Future was created before obtaining resources from FlowController. * Integration tests - MirroringTable operations (#10) * MirroringAsyncConfiguration (#5) Add configuration class to be used by MirroringAsyncConnection. * SecondaryWriteErrorConsumer in MirroringTable (#15) Use SecondaryWriteErrorConsumer to handle write errors in secondary database in MirroringTable's writes. * Use Put to implement Increment and Append (#16) * refactor: extract functions using reflection into package utils.reflection * refactor: extract BatchHelpers into utils Extract common part of batch() helpers into a class and add Predicate argument to nested classes' constructors making it possible to reuse the code in 2.x client. * feat: Initial implementation of a subset of asynchronous HBase 2.x Mirroring Client Contains basic implementation of MirroringAsyncConnection and MirroringAsyncTable. * refactor: extract FlowController's request cancellation into a method * fix: Increment ITs fail with Bigtable as primary (#21) We were setting timerange on Increment objects used in integration tests without any reason and Bigtable doesn't support this operation. Setting timerange in ITs was removed. * fix: RequestScheduling should handle rejected resource reservations (#24) Custom FlowControlerStrategy implementations might, contrary to the default implementation, resolve reservation requests with exception, what we should handle by not performing the action that had to acquire the resources. * feat: Add OpenCensus tracing and metrics. (#14) * fix: make BatchHelpers skip verification of empty read results BatchHelpers provides error handling of batch() when there may be some partial results. Before the commit, matching successful reads were redundantly verified if there were none of them. This commit brings back the behaviour from up to 5a29253: when there are no successful matching reads, a MismatchDetector isn't called on empty arrays. * refactor: make MirroringAsync{Connection,Table} use SecondaryWriteErrorConsumerWithMetrics BatchHelpers require using SecondaryWriteErrorConsumerWithMetrics API. * refactor: make AsyncRequestScheduling accept CompletableFuture<ResourceReservation> instead of ResourceReservation This change is split off from commit introducing MirroringAsyncTable#batch() * feat: implement batch() in MirroringAsyncTable Implementation of MirroringAsyncTable's batch() and MirroringAsyncTable's methods such as get(List<Get) and put(List<Put>) using it. * feat: implement failed mutations log (#19) Failed secondary mutations are written to disk in JSON format, which the user can parse programmatically or inspect visually. Each failure is logged as a separate line, which makes it compatible with solutions like logstash. * refactor: split SplitBatchResponse (#40) SplitBatchResponse was refactored into two parts: splitting into reads/writes and failed/successful. This makes the code simpler and easier to maintain. * refactor: extract helper methods in tests (#48) * refactor: remove redundant writeWithControlFlow argument * feat: copy HBase operations' input lists (#57) * refactor: remove redundant field from MirroringConnection (#55) * feat: verification with sampling (#28) * fix: mirror Increment/Append in batch() using Put. (#47) * refactor: Move HBaseOperation into WriteOperationInfo (#68) * refactor: remove redundant parameter from scheduleWriteWithControlFlow (#69) * fix: integration tests - fix build (#70) * fix: count references to batch operations (#63) * fix: close underlying connections when MirroringConnection is closed (#49) * refactor: fix IDE warnings in MirroringAsyncTable test (#64) * fix: integration tests - check if write errors were reported (#71) * feat: make SecondaryWriteErrorConsumer accept error cause and operation (#65) * fix: do not call callbacks with lock held (#53) * refactor: use AccumulatedExceptions where appropriate (#54) * fix: fix key used in verification sampling ITs (#77) * feat: use faillog for handling write errors (#66) * refactor: add utilities for Futures (FutureUtils) * feat: defer closing connections until async operations complete (#37) Mirroring client schedules asynchronous operations - to mirror the mutations and to verify reads. Before this PR, closing the MirroringAsyncConnection would result in closing the underlying connections immediately. This made the pending asynchronous operations fail. This PR defers closing the underlying connections until all pending operations complete. It is achieved by reference counting the operations. * feat: implement AsyncTableBuilder (#42) * feat: implement MirroringAsyncTable#checkAndMutate (#43) * fix: Implement single Append and Increment as Put (#38) * refactor: simplify SecondaryWriteErrorConsumer API (#78) * feat: concurrent writes in MirroringTable (#79) * test: fix failing concurrent write test (#120) * refactor: renames and moves in RequestScheduling (#87) * wip: handover session comments Co-authored-by: Mateusz Walkiewicz <mwalkiewicz@unoperate.com> Co-authored-by: Adam Czajkowski <prawilny@unoperate.com>
This change is
I messed up dependencies between branches a bit - some of these changes (BatchHelpers, MirroringAsyncConnection) could do better in another PR. I hope I'll get around to doing that tomorrow.
Edit: everything's good with branches right now.