Skip to content

AKCORE-254: RPC batching and event based poll. (#4)#1422

Merged
Sushant Mahajan (smjn) merged 2 commits into
kip-932from
AKCORE-254
Aug 6, 2024
Merged

AKCORE-254: RPC batching and event based poll. (#4)#1422
Sushant Mahajan (smjn) merged 2 commits into
kip-932from
AKCORE-254

Conversation

@smjn

@smjn Sushant Mahajan (smjn) commented Aug 5, 2024

Copy link
Copy Markdown

What

  • Build infra code to allow for RPC batching in PersisterStateManager.
  • Change poll mechanism to be triggered on various events (new RPC, completion of RPC on specific parition key etc.)

Why

Batching

  • Batching improves performance by leveraging the turnaround time of an RPC on a specific node n1 (share coord leader) to batch multiple RPCs into a single one for subsequent requests. For example:
    • Suppose we get a Write RPC w1 for n1.
    • We send this RPC in standard way.
    • While w1 is inflight, lets say 5 more RPCs for n1 are received (write rpcs - w2, w3, w4 and read rpcs r1, r2).
    • coalesce write RPCs w2, w3, and w4 into a single RPC W1 and read RPCs r1, r2 as R1.
    • When w1 completes, remove w1 from inflight queue and make 2 RPCs W1 and R1 to n1 and add them to inflight.
    • This saves 3 network calls.
    • The result of W1 and R1 is broken down to create response objects for individual RPCs (w2, w3, w4, r1, r2).
  • The batching DS is of type:
Node: {
  RPCType: {
    GroupId: [rpc1, rpc2]
  }
}
  • We need RPCType as the key for the inner map as the KafkaApis define request handlers on basis of the APIKey which is the request type. Hence we cannot coalesce a read and write RPC into a single request.
  • We need the GroupId as the innermost map key since the RPCs defined in kip-932 can have only one groupId as the parameter. The schemas allow for multiple topicIds and multiple partitions for each topicId.

Polling

  • Event based poll is more performant and simpler to implement.

Testing

  • Added and updated tests at requisite places.

### What
- Build infra code to allow for RPC batching in PersisterStateManager.
- Change poll mechanism to be triggered on various events (new RPC, completion of RPC on specific parition key etc.)

### Why
- Batching improves performance by leveraging the turnaround time of an RPC on a specific node n1 (share coord leader) to batch multiple RPCs into a single one for subsequent requests. For example: 
  - Suppose we get a Write RPC r1 for n1.
  - We send this RPC in standard way.
  - While r1 is inflight, lets say 3 more RPCs for n1 are received (r2, r3, r4)
  - coalesce and batch r2, r3, and r4 into a single RPC R1.
  - When r1 completes, make a single RPC R1 to n1
  - This saves 2 network calls.
  - The result of R1 is broken down to create response objects for individual RPCs (r2, r3, r4)
- Event based poll is more performant and simpler to implement.

### Testing
-  Added and updated tests at requisite places.
@smjn Sushant Mahajan (smjn) requested review from a team as code owners August 5, 2024 10:33
@smjn Sushant Mahajan (smjn) requested review from Andrew Schofield (AndrewJSchofield) and removed request for a team August 5, 2024 10:33
@cla-assistant

cla-assistant Bot commented Aug 5, 2024

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few comments to address.

nodeRPCMap.get(node).computeIfAbsent(handler.rpcType(), k -> new HashMap<>());
nodeRPCMap.get(node).get(handler.rpcType()).computeIfAbsent(handler.groupId, k -> new LinkedList<>());

nodeRPCMap.get(node).get(handler.rpcType()).get(handler.groupId).add(handler);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be simplified I think. There's no need to do the repeated get() calls, and you could chain the computeIfAbsent() calls instead.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it this way for readability

nodeRPCMap.get(node).get(handler.rpcType()).computeIfAbsent(handler.groupId, k -> new LinkedList<>());

nodeRPCMap.get(node).get(handler.rpcType()).get(handler.groupId).add(handler);
sender.wakeup();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the wakeup really need to be inside the synchronized block?


return new WriteShareGroupStateRequest.Builder(new WriteShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(partitionData.entrySet().stream().map(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nicer to break the line between .stream() and map( I think.


return new ReadShareGroupStateRequest.Builder(new ReadShareGroupStateRequestData()
.setGroupId(groupId)
.setTopics(partitionData.entrySet().stream().map(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -110,6 +139,7 @@ public abstract class PersisterStateManagerHandler implements RequestCompletionH
private int findCoordattempts = 0;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Should be findCoordBackoff and findCoordAttempts.

@Override
public Collection<RequestAndCompletionHandler> generateRequests() {
// There are two sources for requests here:
// 1. A queue which will contain FIND_CORD RPCs and other non-batchable RPCs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIND_COORDINATOR

@smjn Sushant Mahajan (smjn) merged commit 118afb5 into kip-932 Aug 6, 2024
@smjn Sushant Mahajan (smjn) deleted the AKCORE-254 branch August 6, 2024 14:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants