DS, eventCollector: drop event if needed#1430
DS, eventCollector: drop event if needed#1430ti-chi-bot[bot] merged 27 commits intopingcap:masterfrom
Conversation
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Summary of Changes
Hello @asddongmen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request enhances the dynamic stream (dynstream) and event collector components to handle memory pressure more gracefully. It introduces a mechanism to signal dropped events using a new DropEvent type and refactors the memory control logic for better modularity. When the event collector drops certain event types due to memory limits, it now replaces them with a DropEvent and stops processing further events for that specific path.
Highlights
- Introduce DropEvent: A new event type,
DropEvent, is introduced (pkg/common/event/drop_event.go) to explicitly signal when an event (like DML, DDL, or Handshake) is dropped due to memory pressure in the event collector. This allows downstream components to be aware of the dropped event. - Modify OnDrop Interface: The
OnDropmethod in thedynstream.Handlerinterface is updated to returninterface{}(utils/dynstream/interfaces.go). This enables handlers to return a replacement event (like the newDropEvent) when an event is dropped, rather than just performing an action. - Implement Event Dropping in EventCollector: The
OnDrophandler for events in theeventcollector(downstreamadapter/eventcollector/helper.go) is updated. If a DML, Handshake, or DDL event is dropped due to memory pressure, it now logs the drop and returns a newDropEventcontaining information about the dropped event (DispatcherID, Seq, CommitTs). ThisDropEventreplaces the original event in the stream. - Refactor Memory Control Algorithms: The memory control logic within
dynstreamis refactored into an interface (MemoryControlAlgorithm) and specific implementations (PullerMemoryControl,EventCollectorMemoryControl) (utils/dynstream/memory_control_algorithm.go). TheAreaSettingsnow uses an integer type to specify the algorithm, and theareaMemStatholds an instance of the interface (utils/dynstream/interfaces.go,utils/dynstream/memory_control.go). - Mark Path as Dead on Drop: When an event is dropped and replaced by a
DropEventin the event collector (using theEventCollectorMemoryControlalgorithm), the corresponding path in the dynamic stream is marked as 'dead' (utils/dynstream/stream.go,utils/dynstream/memory_control.go). Subsequent events for a dead path are ignored (utils/dynstream/parallel_dynamic_stream.go).
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configureGemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to handle event dropping due to memory pressure, primarily within the dynstream and eventcollector components. Key changes include:
- Refactoring the memory control logic in
dynstreamby introducing aMemoryControlAlgorithminterface and specific implementations (PullerMemoryControl,EventCollectorMemoryControl). This improves modularity. - Modifying the
Handler.OnDropmethod signature to return aninterface{}, allowing handlers to provide a replacement event (e.g., aDropEvent) when an event is dropped. - Introducing a new
pkg/common/event.DropEventtype to represent events that have been dropped. - Updating the
eventcollectorto utilize this newOnDropmechanism. When memory limits are exceeded for theEventCollectorMemoryControlalgorithm, DML, DDL, or Handshake events are replaced withDropEvents, and the corresponding path is marked asdead. - Adding
String()methods to DDL and DML event types for better debuggability.
Overall, the changes seem well-structured to address the problem of event dropping. However, there are a couple of significant concerns regarding a new log.Panic and the deserialization logic for DropEvent that need to be addressed. Several minor points are also noted.
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
@asddongmen: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hongyunyan The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/test pull-cdc-mysql-integration-heavy |
What problem does this PR solve?
Issue Number: ref #736
What is changed and how it works?
This pull request enhances the dynamic stream (dynstream) and event collector components to handle memory pressure more gracefully. It introduces a mechanism to signal dropped events using a new
DropEventtype and refactors the memory control logic for better modularity. When the event collector drops certain event types due to memory limits, it now replaces them with aDropEventand stops processing further events for that specific path.Highlights
DropEvent, is introduced (pkg/common/event/drop_event.go) to explicitly signal when an event (DML, DDL, Handshake) is dropped due to memory pressure in the event collector. This allows downstream components to be aware of the dropped event.OnDropmethod in thedynstream.Handlerinterface is updated to returninterface{}(utils/dynstream/interfaces.go). This enables handlers to return a replacement event (like the newDropEvent) when an event is dropped, rather than just performing an action.OnDrophandler for events in theeventcollector(downstreamadapter/eventcollector/helper.go) is updated. If a DML, Handshake, or DDL event is dropped due to memory pressure, it now logs the drop and returns a newDropEventcontaining information about the dropped event (DispatcherID, Seq, CommitTs). ThisDropEventreplaces the original event in the stream.dynstreamis refactored into an interface (MemoryControlAlgorithm) and specific implementations (PullerMemoryControl,EventCollectorMemoryControl) (utils/dynstream/memory_control_algorithm.go). TheAreaSettingsnow uses an integer type to specify the algorithm, and theareaMemStatholds an instance of the interface (utils/dynstream/interfaces.go,utils/dynstream/memory_control.go).DropEventin the event collector (using theEventCollectorMemoryControlalgorithm), the corresponding path in the dynamic stream is marked as 'dead' (utils/dynstream/stream.go,utils/dynstream/memory_control.go). Subsequent events for a dead path are ignored (utils/dynstream/parallel_dynamic_stream.go).Check List
Tests
memoryQuota.[root@upstream-ticdc-0 log]# grep "reset dispatcher" ticdc.log | wc -l 4404Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note