[LI-HOTFIX] Event based fetcher part 2/3: adding the FetcherEventBus and FetcherEventManager #123
Conversation
ambroff
left a comment
There was a problem hiding this comment.
Almost finished reading this. Looks pretty good.I just have a minor question about getNextEvent().
Feel free to ignore my comment about the tests if you're happy with it as is.
|
|
||
|
|
||
| sealed trait FetcherEvent extends Comparable[FetcherEvent] { | ||
| def priority: Int // an event with a higher priority value is more important |
There was a problem hiding this comment.
do we capture ordering across fetcher event?
Is there a case (or future use case) that a higher priority event would have to depend on a lower priority event?
If so, do we want to introduce a dependency graph for these events?
Or this priority only makes sense for different partitions, if so, could you document it somewhere.
There was a problem hiding this comment.
I haven't found the need for dependencies between the FetcherEvents. That is, the current event types are all conceptually independent of each other: AddPartitions, RemovePartitions, GetPartitionCount, and TruncateAndFetch.
And I don't expect to introduce dependencies in the future.
Not sure I entirely get your concern. If you think there is still confusion, please give an example.
There was a problem hiding this comment.
discussed offline, if there is a ordering requirement, the caller whoever enqueue the event will ensure the ordering e.g., remove parition, wait for completion, and then add partition, etc.
…and FetcherEventManager (linkedin#123) TICKET = KAFKA-10734 LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
…and FetcherEventManager (linkedin#123) TICKET = KAFKA-10734 LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
…and FetcherEventManager (linkedin#123) TICKET = KAFKA-10734 LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
The async/event-based replica fetcher series (TransferLeaderManager, AbstractAsyncFetcher, AsyncReplicaFetcher, FetcherEventBus, FetcherEventManager — PRs #121/#123/#124/#143/#144/#403/#406) was removed in the PR #538 squash. The li.async.fetcher.enable config key was left behind in KafkaConfig as dead surface area: it parsed and validated, but had no consumers. Going with audit P1 #6 option (a): retire the feature, remove the dead config key. Operators with li.async.fetcher.enable=<anything> in their server.properties may see an 'Unknown configuration' warning at broker startup; this is harmless — the broker still starts and the value would have been a no-op anyway. Removes: - Defaults.LiAsyncFetcherEnabled - KafkaConfig.LiAsyncFetcherEnableProp - The brokerConfigDef.define(...) registration - KafkaConfig.liAsyncFetcherEnable accessor Audit option (b) — porting the entire async fetcher series forward into 3.6-li — was deferred. If the optimization is needed in the future, it would need to be reimplemented from scratch on top of upstream's current ReplicaFetcherThread / ReplicaFetcherManager.
This PR adds the FetcherEventBus and FetcherEventManager to support the event-based processing model proposed in https://docs.google.com/document/d/1PZAOwpw09tVDeuSP0OBVhB6dbgz7C7dQU6bRkePQ8qg/edit#
The FetcherEventBus supports queued events and scheduled events with a delay.
The FetcherEventManager spawns a FetcherEventThread, which takes events from the FetcherEventBus and executes them into the FetcherEventProcessor (implemented via the AbstractAsyncFetcher).
This PR uses a single event TruncateAndFetch to model one iteration of the previous thread loop. This does not give the best latency for Add/Remove partition events. If the thread has just begun processing a TruncateAndFetch event when the Add/Remove partition event is enqueued, then the latter will need to wait potentially several network round-trips for the
OffsetsForLeaderEpochRequestand one network round-trip for theFetchRequest. Besides, we haven't added a separate network thread yet. Nonetheless, this PR lays the async processing framework.As future work, we may need to
Committer Checklist (excluded from commit message)