AKCORE-254: RPC batching and event based poll. (#4)#1422
Conversation
### What - Build infra code to allow for RPC batching in PersisterStateManager. - Change poll mechanism to be triggered on various events (new RPC, completion of RPC on specific parition key etc.) ### Why - Batching improves performance by leveraging the turnaround time of an RPC on a specific node n1 (share coord leader) to batch multiple RPCs into a single one for subsequent requests. For example: - Suppose we get a Write RPC r1 for n1. - We send this RPC in standard way. - While r1 is inflight, lets say 3 more RPCs for n1 are received (r2, r3, r4) - coalesce and batch r2, r3, and r4 into a single RPC R1. - When r1 completes, make a single RPC R1 to n1 - This saves 2 network calls. - The result of R1 is broken down to create response objects for individual RPCs (r2, r3, r4) - Event based poll is more performant and simpler to implement. ### Testing - Added and updated tests at requisite places.
|
|
Andrew Schofield (AndrewJSchofield)
left a comment
There was a problem hiding this comment.
Thanks for the PR. A few comments to address.
| nodeRPCMap.get(node).computeIfAbsent(handler.rpcType(), k -> new HashMap<>()); | ||
| nodeRPCMap.get(node).get(handler.rpcType()).computeIfAbsent(handler.groupId, k -> new LinkedList<>()); | ||
|
|
||
| nodeRPCMap.get(node).get(handler.rpcType()).get(handler.groupId).add(handler); |
There was a problem hiding this comment.
This method can be simplified I think. There's no need to do the repeated get() calls, and you could chain the computeIfAbsent() calls instead.
There was a problem hiding this comment.
Did it this way for readability
| nodeRPCMap.get(node).get(handler.rpcType()).computeIfAbsent(handler.groupId, k -> new LinkedList<>()); | ||
|
|
||
| nodeRPCMap.get(node).get(handler.rpcType()).get(handler.groupId).add(handler); | ||
| sender.wakeup(); |
There was a problem hiding this comment.
Does the wakeup really need to be inside the synchronized block?
|
|
||
| return new WriteShareGroupStateRequest.Builder(new WriteShareGroupStateRequestData() | ||
| .setGroupId(groupId) | ||
| .setTopics(partitionData.entrySet().stream().map( |
There was a problem hiding this comment.
nit: nicer to break the line between .stream() and map( I think.
|
|
||
| return new ReadShareGroupStateRequest.Builder(new ReadShareGroupStateRequestData() | ||
| .setGroupId(groupId) | ||
| .setTopics(partitionData.entrySet().stream().map( |
There was a problem hiding this comment.
ditto
| @@ -110,6 +139,7 @@ public abstract class PersisterStateManagerHandler implements RequestCompletionH | |||
| private int findCoordattempts = 0; | |||
There was a problem hiding this comment.
Typo: Should be findCoordBackoff and findCoordAttempts.
| @Override | ||
| public Collection<RequestAndCompletionHandler> generateRequests() { | ||
| // There are two sources for requests here: | ||
| // 1. A queue which will contain FIND_CORD RPCs and other non-batchable RPCs. |
There was a problem hiding this comment.
FIND_COORDINATOR
What
Why
Batching
Polling
Testing