Skip to content

[LI-HOTFIX] Event based fetcher part 2/3: adding the FetcherEventBus and FetcherEventManager #123

Merged
gitlw merged 8 commits into
linkedin:2.4-lifrom
gitlw:event_based_fetcher_2
Mar 22, 2021
Merged

[LI-HOTFIX] Event based fetcher part 2/3: adding the FetcherEventBus and FetcherEventManager #123
gitlw merged 8 commits into
linkedin:2.4-lifrom
gitlw:event_based_fetcher_2

Conversation

@gitlw

@gitlw gitlw commented Mar 16, 2021

Copy link
Copy Markdown

This PR adds the FetcherEventBus and FetcherEventManager to support the event-based processing model proposed in https://docs.google.com/document/d/1PZAOwpw09tVDeuSP0OBVhB6dbgz7C7dQU6bRkePQ8qg/edit#

The FetcherEventBus supports queued events and scheduled events with a delay.
The FetcherEventManager spawns a FetcherEventThread, which takes events from the FetcherEventBus and executes them into the FetcherEventProcessor (implemented via the AbstractAsyncFetcher).

This PR uses a single event TruncateAndFetch to model one iteration of the previous thread loop. This does not give the best latency for Add/Remove partition events. If the thread has just begun processing a TruncateAndFetch event when the Add/Remove partition event is enqueued, then the latter will need to wait potentially several network round-trips for the
OffsetsForLeaderEpochRequest and one network round-trip for the FetchRequest. Besides, we haven't added a separate network thread yet. Nonetheless, this PR lays the async processing framework.
As future work, we may need to

  1. break the TruncateAndFetch into several sub-events
  2. add a separate network thread
  3. reduce the number of fetcher threads

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@gitlw gitlw requested review from ambroff, smccauliff and xiowu0 March 16, 2021 21:51

@ambroff ambroff left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost finished reading this. Looks pretty good.I just have a minor question about getNextEvent().

Feel free to ignore my comment about the tests if you're happy with it as is.

Comment thread core/src/test/scala/unit/kafka/server/FetcherEventBusTest.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetcherEventBus.scala Outdated


sealed trait FetcherEvent extends Comparable[FetcherEvent] {
def priority: Int // an event with a higher priority value is more important

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we capture ordering across fetcher event?
Is there a case (or future use case) that a higher priority event would have to depend on a lower priority event?
If so, do we want to introduce a dependency graph for these events?
Or this priority only makes sense for different partitions, if so, could you document it somewhere.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found the need for dependencies between the FetcherEvents. That is, the current event types are all conceptually independent of each other: AddPartitions, RemovePartitions, GetPartitionCount, and TruncateAndFetch.
And I don't expect to introduce dependencies in the future.

Not sure I entirely get your concern. If you think there is still confusion, please give an example.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline, if there is a ordering requirement, the caller whoever enqueue the event will ensure the ordering e.g., remove parition, wait for completion, and then add partition, etc.

Comment thread core/src/main/scala/kafka/server/FetcherEventManager.scala

@xiowu0 xiowu0 left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Comment thread core/src/main/scala/kafka/server/FetcherEventBus.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetcherEventBus.scala Outdated
Comment thread core/src/main/scala/kafka/server/AbstractAsyncFetcher.scala Outdated
Comment thread core/src/main/scala/kafka/server/FetcherEventManager.scala

@smccauliff smccauliff left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes.

@gitlw gitlw merged commit e3a5936 into linkedin:2.4-li Mar 22, 2021
@gitlw gitlw deleted the event_based_fetcher_2 branch April 21, 2021 19:47
gitlw added a commit to gitlw/kafka that referenced this pull request Feb 24, 2022
…and FetcherEventManager (linkedin#123)

TICKET = KAFKA-10734
LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model
EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Mar 25, 2022
…and FetcherEventManager (linkedin#123)

TICKET = KAFKA-10734
LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model
EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…and FetcherEventManager (linkedin#123)

TICKET = KAFKA-10734
LI_DESCRIPTION = Part 2 of 3 PRs to change the fetcher into an event-based model
EXIT_CRITERIA = When KAFKA-10734 is closed and the changes are pulled in as a part of a release
earlcoder added a commit that referenced this pull request Apr 28, 2026
The async/event-based replica fetcher series (TransferLeaderManager,
AbstractAsyncFetcher, AsyncReplicaFetcher, FetcherEventBus,
FetcherEventManager — PRs #121/#123/#124/#143/#144/#403/#406) was
removed in the PR #538 squash. The li.async.fetcher.enable config key
was left behind in KafkaConfig as dead surface area: it parsed and
validated, but had no consumers.

Going with audit P1 #6 option (a): retire the feature, remove the dead
config key. Operators with li.async.fetcher.enable=<anything> in their
server.properties may see an 'Unknown configuration' warning at broker
startup; this is harmless — the broker still starts and the value would
have been a no-op anyway.

Removes:
- Defaults.LiAsyncFetcherEnabled
- KafkaConfig.LiAsyncFetcherEnableProp
- The brokerConfigDef.define(...) registration
- KafkaConfig.liAsyncFetcherEnable accessor

Audit option (b) — porting the entire async fetcher series forward
into 3.6-li — was deferred. If the optimization is needed in the
future, it would need to be reimplemented from scratch on top of
upstream's current ReplicaFetcherThread / ReplicaFetcherManager.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants