[ISSUE #710]🚀Support pull message result handle-3🎨#721
Conversation
(cherry picked from commit 7d2662a0e6a4fa8265f3fca762d96f63ff05a3a7)
WalkthroughThe changes enhance the Changes
Sequence DiagramsequenceDiagram
participant Client
participant Broker
participant PullRequestHoldService
participant ManyPullRequest
Client->>Broker: Send Pull Request
Broker->>PullRequestHoldService: Suspend Pull Request
PullRequestHoldService->>ManyPullRequest: Add Pull Request to List
ManyPullRequest->>PullRequestHoldService: Confirm Addition
Note over Client, Broker: Client waits for message
Broker->>PullRequestHoldService: Notify Message Arrival
PullRequestHoldService->>ManyPullRequest: Check and Retrieve Requests
ManyPullRequest->>PullRequestHoldService: Return Pull Requests
PullRequestHoldService->>Broker: Notify Message to Client
Broker->>Client: Deliver Message
Assessment against linked issues
Poem
Tip Early access features: enabledWe are currently testing the following features in early access:
Note:
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 1
Outside diff range and nitpick comments (1)
rocketmq-common/src/common/broker/broker_config.rs (1)
158-159: Add documentation for new fields inBrokerConfig.The newly added fields
short_polling_time_millsandlong_polling_enableshould have documentation comments to explain their purpose and usage./// The time in milliseconds for short polling. pub short_polling_time_mills: u64, /// Flag to enable long polling. pub long_polling_enable: bool,
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
Files selected for processing (12)
- rocketmq-broker/Cargo.toml (1 hunks)
- rocketmq-broker/src/lib.rs (1 hunks)
- rocketmq-broker/src/long_polling.rs (1 hunks)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/long_polling/many_pull_request.rs (1 hunks)
- rocketmq-broker/src/long_polling/pull_request.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (2 hunks)
- rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
- rocketmq-remoting/src/protocol.rs (3 hunks)
- rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (2 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
Files skipped from review due to trivial changes (3)
- rocketmq-broker/Cargo.toml
- rocketmq-broker/src/long_polling.rs
- rocketmq-remoting/src/protocol.rs
Additional comments not posted (31)
rocketmq-broker/src/lib.rs (1)
36-36: LGTM!The module renaming from
longpollingtolong_pollingimproves consistency.However, ensure the renamed module is correctly referenced throughout the codebase.
Verification successful
LGTM!
The module renaming from
longpollingtolong_pollinghas been consistently applied across the codebase.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the renamed module `long_polling`. # Test: Search for the usage of the module `long_polling`. Expect: Only occurances of the new module name. rg --type rust $'long_polling'Length of output: 853
Script:
#!/bin/bash # Description: Verify the absence of the old module name `longpolling`. # Test: Search for the usage of the old module name `longpolling`. Expect: No occurrences of the old module name. rg --type rust $'longpolling'Length of output: 28
rocketmq-broker/src/long_polling/many_pull_request.rs (6)
27-32: LGTM!The
newfunction correctly initializes theManyPullRequeststruct.
34-37: LGTM!The
add_pull_requestfunction correctly adds a singlePullRequestto the list.
39-42: LGTM!The
add_pull_requestsfunction correctly adds multiplePullRequestobjects to the list.
44-53: LGTM!The
clone_list_and_clearfunction correctly clones the list and clears it.
60-63: LGTM!The
is_emptyfunction correctly checks if the list is empty.
55-58: Commented-out code.The
get_pull_request_listfunction is commented out.rocketmq-broker/src/long_polling/pull_request.rs (9)
36-54: LGTM!The
newfunction correctly initializes thePullRequeststruct.
56-58: LGTM!The
request_commandfunction correctly returns a reference to therequest_commandfield.
60-62: LGTM!The
request_command_mutfunction correctly returns a mutable reference to therequest_commandfield.
64-66: LGTM!The
client_channelfunction correctly returns a reference to theclient_channelfield.
68-70: LGTM!The
pull_from_this_offsetfunction correctly returns thepull_from_this_offsetfield.
72-74: LGTM!The
subscription_datafunction correctly returns a reference to thesubscription_datafield.
76-78: LGTM!The
message_filterfunction correctly returns a clonedArcof themessage_filterfield.
80-82: LGTM!The
timeout_millisfunction correctly returns thetimeout_millisfield.
83-85: LGTM!The
suspend_timestampfunction correctly returns thesuspend_timestampfield.rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (7)
42-51: LGTM!The
newfunction correctly initializes thePullRequestHoldServicestruct.
59-65: LGTM!The
suspend_pull_requestfunction correctly suspends a pull request and adds it to the list.
67-77: LGTM!The
check_hold_requestfunction correctly checks the hold requests.
79-82: LGTM!The
notify_message_arrivingfunction correctly notifies the arrival of a message.
84-158: LGTM!The
notify_message_arriving_extfunction correctly notifies the arrival of a message with additional parameters.
160-178: LGTM!The
notify_master_onlinefunction correctly notifies that the master is online.
181-183: LGTM!The
build_keyfunction correctly constructs a key from the topic and queue ID.rocketmq-common/src/common/broker/broker_config.rs (1)
226-227: LGTM!rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (1)
40-40: Update the type ofsuspend_timeout_millistou64.The type change from
i64tou64is appropriate for representing timeout values, which are non-negative.rocketmq-remoting/src/protocol/remoting_command.rs (3)
19-19: ImportfmtforDisplayimplementation.The import of
fmtis necessary for theDisplaytrait implementation.
119-138: ImplementDisplayforRemotingCommand.The implementation provides a detailed string representation of the
RemotingCommandstruct, which is useful for debugging and logging.
344-347: Add methodset_suspended_reftoRemotingCommand.The new method provides a way to set the
suspendedfield, enhancing the flexibility of theRemotingCommandstruct.rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2)
87-87: Allow unused assignments.The attribute
#[allow(unused_assignments)]is added to suppress warnings about unused assignments, which may be necessary for certain logic flows.
179-197: HandleResponseCode::PullNotFoundinDefaultPullMessageResultHandler.The added logic handles the scenario where no messages are found for a pull request. It checks for suspend flags and adjusts polling times based on configuration.
rocketmq-broker/src/processor/pull_message_processor.rs (1)
756-756: Verify the usage ofis_broadcastfunction.Ensure that the
is_broadcastfunction is correctly implemented and used within this function.Verification successful
The
is_broadcastfunction is correctly implemented and used.The function checks if the
proxy_pull_broadcastflag is set or if theconsumer_group_infoindicates a broadcasting message model and passive consume type. The usage of this function in the code is appropriate and aligns with its intended purpose.
is_broadcastfunction implementation is located inrocketmq-broker/src/processor/pull_message_processor.rsat line 760.- The function is used at line 756 in the same file.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation and usage of `is_broadcast` function. # Test: Search for the function implementation. Expect: Only one occurrence. rg --type rust $'fn is_broadcast'Length of output: 118
Script:
#!/bin/bash # Description: Extract the definition and all usages of the `is_broadcast` function within the file `rocketmq-broker/src/processor/pull_message_processor.rs`. # Extract the definition of `is_broadcast`. rg --type rust --context 5 'fn is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rs # Extract all usages of `is_broadcast` within the file. rg 'is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rsLength of output: 582
Script:
#!/bin/bash # Description: Extract the complete implementation of the `is_broadcast` function from the file `rocketmq-broker/src/processor/pull_message_processor.rs`. # Extract the complete implementation of `is_broadcast`. rg --type rust --context 30 'fn is_broadcast' rocketmq-broker/src/processor/pull_message_processor.rsLength of output: 1539
| None => proxy_pull_broadcast, | ||
| } | ||
| } | ||
| pub async fn execute_request_when_wakeup(&self, channel: Channel, request: RemotingCommand) {} |
There was a problem hiding this comment.
Empty function detected.
The execute_request_when_wakeup function is currently empty. Consider implementing the function or adding a TODO comment.
Do you want me to help implement this function or open a GitHub issue to track this task?
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 0
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (4)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs (2 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (2 hunks)
- rocketmq-remoting/src/protocol/remoting_command.rs (3 hunks)
Files skipped from review as they are similar to previous changes (4)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs
- rocketmq-broker/src/processor/default_pull_message_result_handler.rs
- rocketmq-broker/src/processor/pull_message_processor.rs
- rocketmq-remoting/src/protocol/remoting_command.rs
(cherry picked from commit 7d2662a0e6a4fa8265f3fca762d96f63ff05a3a7)
Which Issue(s) This PR Fixes(Closes)
Fixes #710
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
PullRequestHoldServiceto manage suspended pull requests for message processing.ManyPullRequestto handle multiple pull requests with synchronization.Enhancements
BrokerConfigwith new fields for short polling time and long polling enablement.fmt::Displayfor various enums andRemotingCommandfor better logging and debugging.Bug Fixes
PullNotFoundinDefaultPullMessageResultHandler.Refactor
API Changes
PullMessageRequestHeaderto useu64forsuspend_timeout_millis.