Skip to content

[ISSUE #669]🚀Support pull message consume-3#670

Merged
mxsm merged 1 commit intomainfrom
feature-669
Jun 20, 2024
Merged

[ISSUE #669]🚀Support pull message consume-3#670
mxsm merged 1 commit intomainfrom
feature-669

Conversation

@mxsm
Copy link
Copy Markdown
Owner

@mxsm mxsm commented Jun 20, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #669

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced support for change listeners to handle consumer group events.
    • Added methods for managing consumer and producer channels using Channel as keys.
  • Improvements

    • Enhanced ConsumerManager and ProducerManager for better consumer group and client registration management.
    • Updated various modules to improve the handling of consumer and producer data.
  • Bug Fixes

    • Improved logic for message filtering and message retrieval processes.
  • Refactor

    • Replaced String keys with Channel keys in several internal data structures.
    • Converted several async functions to non-async for better performance.
  • Documentation

    • Updated public-facing method signatures and visibility for improved clarity and usability.

@mxsm
Copy link
Copy Markdown
Owner Author

mxsm commented Jun 20, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jun 20, 2024

Walkthrough

The changes include significant updates to the RocketMQ broker's consumer and producer management. Key enhancements involve using a Channel instead of a String for client identification, introducing new methods in various structs for managing consumer and producer information, and integrating a DefaultConsumerIdsChangeListener for tracking consumer group events and client registrations.

Changes

File Path Change Summary
rocketmq-broker/.../broker_runtime.rs Added DefaultConsumerIdsChangeListener and several fields and methods to BrokerRuntime and ConsumerManager.
rocketmq-broker/.../client.rs Introduced modules consumer_group_event and consumer_ids_change_listener.
rocketmq-broker/.../client_channel_info.rs Replaced socket_addr with channel in ClientChannelInfo struct.
rocketmq-broker/.../consumer_group_event.rs Created enum ConsumerGroupEvent.
rocketmq-broker/.../consumer_group_info.rs Updated to use Channel as key and changed several functions from async to non-async.
rocketmq-broker/.../consumer_ids_change_listener.rs Introduced trait ConsumerIdsChangeListener with handle and shutdown methods.
rocketmq-broker/.../default_consumer_ids_change_listener.rs Implemented ConsumerIdsChangeListener trait for DefaultConsumerIdsChangeListener.
rocketmq-broker/.../manager/consumer_manager.rs Added fields, methods, and updated constructor for ConsumerManager.
rocketmq-broker/.../manager/producer_manager.rs Changed data structures to use Channel instead of String and updated methods accordingly.
rocketmq-broker/.../processor/client_manage_processor.rs Replaced ctx.remoting_address().to_string() with ctx.as_ref().connection().channel().clone().
rocketmq-broker/.../processor/pull_message_processor.rs Introduced new imports and methods, updated fields and method signatures in PullMessageProcessor.
rocketmq-broker/.../processor/pull_message_result_handler.rs Updated PullMessageResultHandler to use Box<dyn MessageFilter> and changed begin_time_mills to u64.
rocketmq-broker/.../subscription/manager/subscription_group_manager.rs Added get_forbidden and get_forbidden_internal methods.
rocketmq-store/.../default_message_store.rs Added new methods for fetching min/max offsets and getting messages in DefaultMessageStore.
rocketmq-store/.../local_file_consume_queue_store.rs Modified get_max_offset to return Option<i64> and added methods get_min_offset_in_queue and get_max_offset_in_queue.

Sequence Diagram(s)

sequenceDiagram
    participant BrokerRuntime
    participant ConsumerManager
    participant DefaultConsumerIdsChangeListener 
    participant BrokerStatsManager
    
    BrokerRuntime ->> ConsumerManager: Initialize with DefaultConsumerIdsChangeListener
    ConsumerManager ->> BrokerStatsManager: Set state getter
    BrokerRuntime ->> ConsumerManager: Set BrokerStatsManager
    BrokerStatsManager -->> ConsumerManager: Weak reference

    note over BrokerRuntime, ConsumerManager: Handles consumer group events and client registrations
Loading

Assessment against linked issues

Objective Addressed Explanation
Support pull message consume-3 (#669)

Poem

In the silent hum of RocketMQ,
Channels now pulse where strings once grew.
Consumers marked with nimble grace,
Listeners ensure no change misplaced.
👾 A broker's dream, of futures new.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 20, 2024

Codecov Report

Attention: Patch coverage is 37.28223% with 360 lines in your changes missing coverage. Please review.

Project coverage is 26.92%. Comparing base (52fd565) to head (62a0a14).

Files Patch % Lines
...tmq-broker/src/processor/pull_message_processor.rs 0.00% 130 Missing ⚠️
...etmq-broker/src/client/manager/consumer_manager.rs 0.00% 56 Missing ⚠️
rocketmq-store/src/base/get_message_result.rs 76.82% 35 Missing ⚠️
rocketmq-broker/src/client/consumer_group_info.rs 71.02% 31 Missing ⚠️
...subscription/manager/subscription_group_manager.rs 0.00% 15 Missing ⚠️
...q-store/src/message_store/default_message_store.rs 0.00% 15 Missing ⚠️
rocketmq-broker/src/util/hook_utils.rs 0.00% 9 Missing ⚠️
...-store/src/queue/local_file_consume_queue_store.rs 0.00% 9 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 8 Missing ⚠️
...etmq-broker/src/client/manager/producer_manager.rs 0.00% 7 Missing ⚠️
... and 11 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #670      +/-   ##
==========================================
+ Coverage   26.21%   26.92%   +0.71%     
==========================================
  Files         242      246       +4     
  Lines       17954    18534     +580     
==========================================
+ Hits         4706     4990     +284     
- Misses      13248    13544     +296     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 20

Outside diff range and nitpick comments (4)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

149-167: Clarify the use of bit manipulation in get_forbidden_internal.

Consider adding inline comments to explain the bit manipulation logic in get_forbidden_internal, as this could enhance readability and maintainability for developers unfamiliar with bitwise operations.

rocketmq-store/src/queue.rs (3)

231-231: Consider providing more context or documentation for the method get_max_offset.

While the method's purpose is clear, adding a brief docstring explaining when and why to use this method would improve code maintainability.


259-259: Method get_min_offset_in_queue lacks documentation.

It's helpful to document public methods to clarify their purpose and usage, which aids future maintainability and usability by other developers.


261-261: Method get_max_offset_in_queue lacks documentation.

Adding a brief docstring here would improve the understandability of this method's functionality and its role within the system.

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 52fd565 and 62a0a14.

Files selected for processing (34)
  • rocketmq-broker/src/broker_runtime.rs (6 hunks)
  • rocketmq-broker/src/client.rs (1 hunks)
  • rocketmq-broker/src/client/client_channel_info.rs (3 hunks)
  • rocketmq-broker/src/client/consumer_group_event.rs (1 hunks)
  • rocketmq-broker/src/client/consumer_group_info.rs (8 hunks)
  • rocketmq-broker/src/client/consumer_ids_change_listener.rs (1 hunks)
  • rocketmq-broker/src/client/default_consumer_ids_change_listener.rs (1 hunks)
  • rocketmq-broker/src/client/manager/consumer_manager.rs (1 hunks)
  • rocketmq-broker/src/client/manager/producer_manager.rs (3 hunks)
  • rocketmq-broker/src/filter.rs (1 hunks)
  • rocketmq-broker/src/filter/expression_for_retry_message_filter.rs (1 hunks)
  • rocketmq-broker/src/filter/expression_message_filter.rs (1 hunks)
  • rocketmq-broker/src/filter/manager/consumer_filter_manager.rs (2 hunks)
  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (2 hunks)
  • rocketmq-broker/src/processor/client_manage_processor.rs (2 hunks)
  • rocketmq-broker/src/processor/pull_message_processor.rs (5 hunks)
  • rocketmq-broker/src/processor/pull_message_result_handler.rs (2 hunks)
  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1 hunks)
  • rocketmq-broker/src/util/hook_utils.rs (2 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-remoting/src/connection.rs (1 hunks)
  • rocketmq-remoting/src/net/channel.rs (1 hunks)
  • rocketmq-remoting/src/runtime/server.rs (1 hunks)
  • rocketmq-store/src/base.rs (1 hunks)
  • rocketmq-store/src/base/get_message_result.rs (1 hunks)
  • rocketmq-store/src/base/message_result.rs (2 hunks)
  • rocketmq-store/src/base/message_status_enum.rs (1 hunks)
  • rocketmq-store/src/consume_queue.rs (1 hunks)
  • rocketmq-store/src/filter.rs (2 hunks)
  • rocketmq-store/src/lib.rs (1 hunks)
  • rocketmq-store/src/log_file.rs (2 hunks)
  • rocketmq-store/src/message_store/default_message_store.rs (3 hunks)
  • rocketmq-store/src/queue.rs (2 hunks)
  • rocketmq-store/src/queue/local_file_consume_queue_store.rs (2 hunks)
Files not reviewed due to errors (1)
  • rocketmq-store/src/message_store/default_message_store.rs (no review received)
Files skipped from review due to trivial changes (3)
  • rocketmq-store/src/base.rs
  • rocketmq-store/src/base/message_result.rs
  • rocketmq-store/src/consume_queue.rs
Additional context used
Learnings (1)
rocketmq-broker/src/client/manager/consumer_manager.rs (1)
User: TeslaRustor
PR: mxsm/rocketmq-rust#652
File: rocketmq-broker/src/client/manager/consumer_manager.rs:23-25
Timestamp: 2024-06-17T07:05:52.260Z
Learning: TeslaRustor plans to implement the `find_subscription_data` method in the `ConsumerManager` class in a future version and has acknowledged the current placeholder implementation.
GitHub Check: codecov/patch
rocketmq-broker/src/client/client_channel_info.rs

[warning] 54-54: rocketmq-broker/src/client/client_channel_info.rs#L54
Added line #L54 was not covered by tests


[warning] 70-70: rocketmq-broker/src/client/client_channel_info.rs#L70
Added line #L70 was not covered by tests


[warning] 77-79: rocketmq-broker/src/client/client_channel_info.rs#L77-L79
Added lines #L77 - L79 were not covered by tests

rocketmq-broker/src/client/consumer_group_event.rs

[warning] 17-17: rocketmq-broker/src/client/consumer_group_event.rs#L17
Added line #L17 was not covered by tests


[warning] 44-44: rocketmq-broker/src/client/consumer_group_event.rs#L44
Added line #L44 was not covered by tests


[warning] 49-49: rocketmq-broker/src/client/consumer_group_event.rs#L49
Added line #L49 was not covered by tests


[warning] 54-54: rocketmq-broker/src/client/consumer_group_event.rs#L54
Added line #L54 was not covered by tests


[warning] 59-59: rocketmq-broker/src/client/consumer_group_event.rs#L59
Added line #L59 was not covered by tests


[warning] 64-64: rocketmq-broker/src/client/consumer_group_event.rs#L64
Added line #L64 was not covered by tests

rocketmq-broker/src/client/consumer_group_info.rs

[warning] 35-35: rocketmq-broker/src/client/consumer_group_info.rs#L35
Added line #L35 was not covered by tests


[warning] 39-39: rocketmq-broker/src/client/consumer_group_info.rs#L39
Added line #L39 was not covered by tests


[warning] 76-77: rocketmq-broker/src/client/consumer_group_info.rs#L76-L77
Added lines #L76 - L77 were not covered by tests


[warning] 86-86: rocketmq-broker/src/client/consumer_group_info.rs#L86
Added line #L86 was not covered by tests


[warning] 90-91: rocketmq-broker/src/client/consumer_group_info.rs#L90-L91
Added lines #L90 - L91 were not covered by tests


[warning] 95-95: rocketmq-broker/src/client/consumer_group_info.rs#L95
Added line #L95 was not covered by tests


[warning] 99-100: rocketmq-broker/src/client/consumer_group_info.rs#L99-L100
Added lines #L99 - L100 were not covered by tests


[warning] 104-105: rocketmq-broker/src/client/consumer_group_info.rs#L104-L105
Added lines #L104 - L105 were not covered by tests


[warning] 112-113: rocketmq-broker/src/client/consumer_group_info.rs#L112-L113
Added lines #L112 - L113 were not covered by tests


[warning] 115-115: rocketmq-broker/src/client/consumer_group_info.rs#L115
Added line #L115 was not covered by tests


[warning] 129-130: rocketmq-broker/src/client/consumer_group_info.rs#L129-L130
Added lines #L129 - L130 were not covered by tests


[warning] 180-180: rocketmq-broker/src/client/consumer_group_info.rs#L180
Added line #L180 was not covered by tests


[warning] 204-204: rocketmq-broker/src/client/consumer_group_info.rs#L204
Added line #L204 was not covered by tests


[warning] 241-242: rocketmq-broker/src/client/consumer_group_info.rs#L241-L242
Added lines #L241 - L242 were not covered by tests


[warning] 246-247: rocketmq-broker/src/client/consumer_group_info.rs#L246-L247
Added lines #L246 - L247 were not covered by tests


[warning] 255-256: rocketmq-broker/src/client/consumer_group_info.rs#L255-L256
Added lines #L255 - L256 were not covered by tests

rocketmq-broker/src/broker_runtime.rs

[warning] 122-122: rocketmq-broker/src/broker_runtime.rs#L122
Added line #L122 was not covered by tests


[warning] 160-163: rocketmq-broker/src/broker_runtime.rs#L160-L163
Added lines #L160 - L163 were not covered by tests


[warning] 170-170: rocketmq-broker/src/broker_runtime.rs#L170
Added line #L170 was not covered by tests


[warning] 173-173: rocketmq-broker/src/broker_runtime.rs#L173
Added line #L173 was not covered by tests


[warning] 191-191: rocketmq-broker/src/broker_runtime.rs#L191
Added line #L191 was not covered by tests

Additional comments not posted (36)
rocketmq-broker/src/filter.rs (3)

18-18: Visibility of consumer_filter_data module set to pub(crate) is appropriate for encapsulation within the crate.


19-19: Adding expression_for_retry_message_filter as a pub(crate) module helps maintain encapsulation and focus on retry-specific message filtering logic.


20-20: The expression_message_filter module, marked as pub(crate), is well-scoped for internal use, supporting clean architecture and encapsulation.

rocketmq-broker/src/client.rs (2)

19-19: Introduction of consumer_group_event as a pub(crate) module aligns well with encapsulation principles and focuses on handling specific group events.


21-21: The consumer_ids_change_listener module, set as pub(crate), ensures that consumer ID change handling is encapsulated within the crate.

rocketmq-broker/src/client/consumer_ids_change_listener.rs (1)

22-26: The ConsumerIdsChangeListener trait, with methods handle and shutdown, provides a robust interface for managing consumer ID events and cleanup processes.

rocketmq-store/src/lib.rs (1)

22-22: The addition of consume_queue and filter modules in rocketmq-store enhances the store's capabilities to support new broker features.

rocketmq-store/src/filter.rs (1)

Line range hint 23-39: The MessageFilter trait definition is well-implemented and clear.

This provides a robust interface for message filtering, crucial for message processing based on different criteria.

rocketmq-broker/src/processor/pull_message_result_handler.rs (1)

17-23: New imports have been added to support the updated functionalities in PullMessageResultHandler. Ensure these libraries/modules are used efficiently.

rocketmq-store/src/base/message_status_enum.rs (1)

55-55: The GetMessageStatus enum provides a comprehensive set of statuses for message retrieval operations, enhancing the clarity and manageability of error handling.

rocketmq-broker/src/client/client_channel_info.rs (1)

19-38: The ClientChannelInfo struct has been updated to use a Channel type instead of a string address, reflecting a more robust handling of network channels. Ensure this change is well-integrated across all usages of ClientChannelInfo in the project.

Verification successful

The verification reveals that the channel field in ClientChannelInfo has been correctly integrated across various files. The instances of ClientChannelInfo in the files client_manage_processor.rs, consumer_group_info.rs, and producer_manager.rs all show appropriate usage of the channel field.

  • rocketmq-broker/src/processor/client_manage_processor.rs
  • rocketmq-broker/src/client/consumer_group_info.rs
  • rocketmq-broker/src/client/manager/producer_manager.rs

This confirms that the change has been well-integrated across the project.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Ensure all usages of `ClientChannelInfo` have been updated to use the new `channel` field.
rg --type rust "ClientChannelInfo"

Length of output: 2345



Script:

#!/bin/bash
# Check if the new `channel` field is used correctly in all instances of `ClientChannelInfo`.

# Files identified by `rg` that use `ClientChannelInfo`
files=(
    "rocketmq-broker/src/processor/client_manage_processor.rs"
    "rocketmq-broker/src/client/consumer_group_info.rs"
    "rocketmq-broker/src/client/manager/producer_manager.rs"
)

# Check each file for correct usage of the `channel` field in `ClientChannelInfo`
for file in "${files[@]}"; do
    echo "Checking $file..."
    rg --context 5 "ClientChannelInfo" "$file" | grep -E "channel|ClientChannelInfo"
done

Length of output: 4069

rocketmq-broker/src/client/consumer_group_event.rs (1)

17-29: The ConsumerGroupEvent enum is well-defined, providing clear categorization of consumer group events. This should facilitate better event handling in consumer group management.

Tools
GitHub Check: codecov/patch

[warning] 17-17: rocketmq-broker/src/client/consumer_group_event.rs#L17
Added line #L17 was not covered by tests

rocketmq-remoting/src/connection.rs (1)

62-70: The implementation of accessor methods framed and channel is clean and follows Rust's idiomatic practices.

rocketmq-store/src/log_file.rs (1)

85-99: The newly added methods for managing message offsets and retrieving messages are well-implemented. These methods enhance the RocketMQMessageStore trait's capabilities in handling message offsets and fetching messages based on filters.

rocketmq-remoting/src/net/channel.rs (1)

22-22: The Channel struct is well-defined, with properties and methods that correctly manage channel information. Using Uuid for channel_id is a robust choice for ensuring uniqueness.

rocketmq-broker/src/client/manager/producer_manager.rs (2)

57-57: The transition to using Channel instead of String for client identification in unregister_producer aligns well with the overall improvements in channel management. Good job on maintaining the integrity of the group_channel_table by checking and potentially removing empty entries.


75-95: The updated register_producer function correctly handles both new and existing producers using the Channel type. Setting the last_update_timestamp on existing entries before returning is a good practice for ensuring consistency in state management.

rocketmq-broker/src/filter/manager/consumer_filter_manager.rs (2)

59-59: The addition of expression type handling in the build function is a crucial update for managing consumer filters based on the type of expression. This ensures that only relevant data is processed further.


98-104: Currently, the get_consumer_filter_data function always returns None. If this is a placeholder, please ensure that the implementation is completed in future iterations or clarify if this behavior is intentional.

rocketmq-broker/src/processor/client_manage_processor.rs (2)

80-80: The update to initialize ClientChannelInfo with Channel directly from the connection context in unregister_client is consistent with the overall improvements in channel management across the system.


106-106: The update in heart_beat to use Channel for creating ClientChannelInfo and the addition of new fields in the response command enhance the robustness and clarity of heartbeat handling.

rocketmq-broker/src/client/manager/consumer_manager.rs (5)

39-55: The new method in ConsumerManager effectively initializes all necessary components with appropriate defaults and thread safety measures using Arc and RwLock. This setup is essential for the reliable operation of consumer management.


57-73: The new_with_broker_stats method extends the basic initialization of ConsumerManager by incorporating broker statistics, which is critical for a comprehensive management system. The use of configuration values for timeouts is a good practice.


77-79: The set_broker_stats_manager method provides necessary flexibility in managing broker statistics, which is essential for adapting to changes in system configuration or operational requirements.


83-85: The find_subscription_data method's delegation to find_subscription_data_internal for actual data retrieval is a good practice in maintaining separation of concerns and enhancing code maintainability.


128-139: The compensate_subscribe_data method securely updates the compensation table with subscription data, ensuring thread safety with write locks. This method is crucial for handling data discrepancies or temporary compensations.

rocketmq-broker/src/client/consumer_group_info.rs (1)

21-21: Ensure appropriate use of parking_lot for thread safety.

The use of parking_lot::Mutex and RwLock is appropriate here for providing finer-grained lock control which is more performant than the standard Rust mutexes in this context.

Also applies to: 22-22, 25-25

rocketmq-store/src/queue/local_file_consume_queue_store.rs (1)

321-326: Ensure that the method get_max_offset handles potential errors or edge cases.

The method should robustly handle cases where the topic or queue ID might not exist, returning None or an appropriate error.

rocketmq-broker/src/processor/pull_message_processor.rs (4)

28-40: Ensure that the newly added imports are used within the file to avoid unnecessary dependencies.


66-67: The addition of consumer_offset_manager and message_store fields should be properly initialized and integrated within the class methods.


485-494: The new query_broadcast_pull_init_offset method is critical for handling broadcast messages. Ensure it handles edge cases and errors gracefully.


Line range hint 96-474: The modifications in the process_request_inner method enhance message pulling capabilities. Ensure thorough testing, especially around the new error handling and branching logic.

rocketmq-broker/src/broker_runtime.rs (4)

51-51: Ensure that the new import DefaultConsumerIdsChangeListener is utilized properly within the file.


92-92: The addition of consumer_manager as an Arc<ConsumerManager> ensures thread-safe access across different parts of the application.


191-191: The reassignment of consumer_manager in the BrokerRuntime constructor ensures it's properly initialized for the runtime's lifecycle. This line is critical and should be covered by integration tests to verify its impact on the system's behavior.
[ISSNUE]

+ // TODO: Ensure integration tests cover the initialization and lifecycle of consumer_manager in BrokerRuntime
Tools
GitHub Check: codecov/patch

[warning] 191-191: rocketmq-broker/src/broker_runtime.rs#L191
Added line #L191 was not covered by tests


122-122: The cloning of consumer_manager in the Clone implementation is critical for maintaining state consistency across clones of BrokerRuntime.

Verification successful

The Clone implementation for BrokerRuntime has been located, and it includes the cloning of consumer_manager. This confirms that the cloning behavior is explicitly defined and should maintain state consistency across clones.

  • rocketmq-broker/src/broker_runtime.rs, lines 103-133: Clone implementation for BrokerRuntime.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the cloning behavior of `consumer_manager` is consistent and does not lead to state inconsistencies.

# Test: Search for usages of `BrokerRuntime::clone` to ensure it's being used safely.
rg --type rust $'BrokerRuntime::clone'

Length of output: 37



Script:

#!/bin/bash
# Locate the `BrokerRuntime` struct definition.
ast-grep --lang rust --pattern 'struct BrokerRuntime { $$$ }'

Length of output: 63



Script:

#!/bin/bash
# Search for the definition of `BrokerRuntime` in all Rust files.
rg --type rust 'struct BrokerRuntime'

Length of output: 188



Script:

#!/bin/bash
# Locate the `Clone` implementation for `BrokerRuntime` in `rocketmq-broker/src/broker_runtime.rs`.
ast-grep --lang rust --pattern 'impl Clone for BrokerRuntime { $$$ }' rocketmq-broker/src/broker_runtime.rs

Length of output: 3092

Tools
GitHub Check: codecov/patch

[warning] 122-122: rocketmq-broker/src/broker_runtime.rs#L122
Added line #L122 was not covered by tests

Comment on lines +489 to +504
fn get_min_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 {
todo!()
}

fn get_max_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 {
todo!()
}

fn get_max_offset_in_queue_committed(
&self,
topic: &str,
queue_id: i32,
committed: bool,
) -> i64 {
todo!()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider implementing the get_min_offset_in_queue, get_max_offset_in_queue, and get_max_offset_in_queue_committed methods.

Currently, these methods are placeholders with todo!(), which will panic at runtime if called. Please provide implementations for these methods to handle queue offset management properly.

Comment on lines +506 to +517
async fn get_message(
&self,
group: &str,
topic: &str,
queue_id: i32,
offset: i64,
max_msg_nums: i32,
max_total_msg_size: i32,
message_filter: &dyn MessageFilter,
) -> Option<GetMessageResult> {
todo!()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper implementation of the get_message asynchronous method.

Like the offset management methods, get_message is also a placeholder and will panic if executed. This method is crucial for message retrieval based on various parameters and should be implemented to handle different filtering conditions.

Comment on lines +144 to +148
pub fn get_forbidden(&self, group: &str, topic: &str, forbidden_index: i32) -> bool {
let topic_forbidden = self.get_forbidden_internal(group, topic);
let bit_forbidden = 1 << forbidden_index;
(topic_forbidden & bit_forbidden) == bit_forbidden
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add method documentation for get_forbidden.

It's a best practice to provide documentation for public methods, especially when they involve bit manipulation which may not be immediately clear to other developers.

Comment on lines +147 to +151
pub channel_expired_timeout: u64,
pub subscription_expired_timeout: u64,
pub enable_property_filter: bool,
pub filter_support_retry: bool,
pub use_server_side_reset_offset: bool,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure consistency in configuration defaults.

The default values for channel_expired_timeout, subscription_expired_timeout, enable_property_filter, filter_support_retry, and use_server_side_reset_welcome are set in the Default implementation, but not in the constructor methods like new_with_params. This could lead to inconsistent behavior if different constructors are used. Consider refactoring to ensure consistent defaults across all constructors.

Comment on lines +55 to +72
impl GetMessageResult {
pub fn new() -> Self {
GetMessageResult {
message_mapped_list: Vec::with_capacity(100),
message_buffer_list: Vec::with_capacity(100),
message_queue_offset: Vec::with_capacity(100),
..Default::default()
}
}

pub fn new_result_size(result_size: usize) -> Self {
GetMessageResult {
message_mapped_list: Vec::with_capacity(result_size),
message_buffer_list: Vec::with_capacity(result_size),
message_queue_offset: Vec::with_capacity(result_size),
..Default::default()
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize memory allocation strategy.

You've implemented a custom constructor new_result_size to pre-allocate memory for message lists based on the expected size. This is a good practice for performance optimization. However, consider also applying this pattern to other vectors in your data structures where appropriate to avoid multiple memory allocations.

Comment on lines +35 to +39
#[derive(Debug, Clone)]
pub struct ConsumerGroupInfo {
group_name: String,
subscription_table: Arc<RwLock<HashMap<String, SubscriptionData>>>,
channel_info_table: Arc<RwLock<HashMap<String, ClientChannelInfo>>>,
channel_info_table: Arc<RwLock<HashMap<Channel, ClientChannelInfo>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit tests for ConsumerGroupInfo struct to ensure functionality.

+ #[cfg(test)]
+ mod tests {
+     use super::*;
+     #[test]
+     fn test_initialization() {
+         let group_info = ConsumerGroupInfo::new("test_group".to_string(), ConsumeType::ConsumeActively, MessageModel::Broadcasting, ConsumeFromWhere::ConsumeFromFirstOffset);
+         assert_eq!(group_info.group_name, "test_group");
+     }
+ }

Ensure that the new fields and methods are covered by unit tests to maintain code quality and functionality.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[derive(Debug, Clone)]
pub struct ConsumerGroupInfo {
group_name: String,
subscription_table: Arc<RwLock<HashMap<String, SubscriptionData>>>,
channel_info_table: Arc<RwLock<HashMap<String, ClientChannelInfo>>>,
channel_info_table: Arc<RwLock<HashMap<Channel, ClientChannelInfo>>>,
#[derive(Debug, Clone)]
pub struct ConsumerGroupInfo {
group_name: String,
subscription_table: Arc<RwLock<HashMap<String, SubscriptionData>>>,
channel_info_table: Arc<RwLock<HashMap<Channel, ClientChannelInfo>>>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initialization() {
let group_info = ConsumerGroupInfo::new("test_group".to_string(), ConsumeType::ConsumeActively, MessageModel::Broadcasting, ConsumeFromWhere::ConsumeFromFirstOffset);
assert_eq!(group_info.group_name, "test_group");
}
}
Tools
GitHub Check: codecov/patch

[warning] 35-35: rocketmq-broker/src/client/consumer_group_info.rs#L35
Added line #L35 was not covered by tests


[warning] 39-39: rocketmq-broker/src/client/consumer_group_info.rs#L39
Added line #L39 was not covered by tests

Comment on lines +26 to +40
fn is_matched_by_consume_queue(
&self,
tags_code: Option<i64>,
cq_ext_unit: Option<&CqExtUnit>,
) -> bool {
todo!()
}

fn is_matched_by_commit_log(
&self,
msg_buffer: Option<&[u8]>,
properties: Option<&HashMap<String, String>>,
) -> bool {
todo!()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace todo!() with the actual implementation or provide further clarification.

The methods is_matched_by_consume_queue and is_matched_by_commit_log are currently placeholders. This could lead to runtime panics if these methods are called. Please implement the functionality or handle these cases appropriately.

Comment on lines +26 to +40
fn is_matched_by_consume_queue(
&self,
tags_code: Option<i64>,
cq_ext_unit: Option<&CqExtUnit>,
) -> bool {
todo!()
}

fn is_matched_by_commit_log(
&self,
msg_buffer: Option<&[u8]>,
properties: Option<&HashMap<String, String>>,
) -> bool {
todo!()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace todo!() with the actual implementation or provide further clarification.

The methods is_matched_by_consume_queue and is_matched_by_commit_log are currently placeholders. This could lead to runtime panics if these methods are called. Please implement the functionality or handle these cases appropriately.

Comment on lines +399 to +403
fn get_min_offset_in_queue(&self, topic: &str, queue_id: i32) -> i64 {
self.find_or_create_consume_queue(topic, queue_id)
.lock()
.get_min_offset_in_queue()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The get_min_offset_in_queue method currently lacks error handling. It directly calls find_or_create_consume_queue, lock, and get_min_offset_in_queue without checking for potential errors or handling cases where the queue might not exist.

  • File: rocketmq-store/src/queue/local_file_consume_queue_store.rs
  • Lines: 399-403

Category:

Analysis chain

Method get_min_offset_in_queue should include error handling.

This method should handle potential errors, such as when the queue does not exist, by returning an appropriate error or fallback value.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Ensure that error handling is robust in `get_min_offset_in_queue`.
grep -rn "get_min_offset_in_queue" ./src/

Length of output: 81



Script:

#!/bin/bash
# Locate the file `local_file_consume_queue_store.rs`.
fd local_file_consume_queue_store.rs

Length of output: 97



Script:

#!/bin/bash
# Search for the `get_min_offset_in_queue` method in `rocketmq-store/src/queue/local_file_consume_queue_store.rs`.
ast-grep --lang rust --pattern 'fn get_min_offset_in_queue(&self, $_, $_) { $$$ }' rocketmq-store/src/queue/local_file_consume_queue_store.rs

Length of output: 143



Script:

#!/bin/bash
# Search for the `get_min_offset_in_queue` method in `rocketmq-store/src/queue/local_file_consume_queue_store.rs` using `rg`.
rg 'fn get_min_offset_in_queue' rocketmq-store/src/queue/local_file_consume_queue_store.rs

Length of output: 167



Script:

#!/bin/bash
# Extract the full implementation of the `get_min_offset_in_queue` method.
rg -A 10 'fn get_min_offset_in_queue' rocketmq-store/src/queue/local_file_consume_queue_store.rs

Length of output: 398


let client_channel_info = ClientChannelInfo::new(
ctx.remoting_address().to_string(),
ctx.as_ref().connection().channel().clone(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The current implementation of the unregister_client function in rocketmq-broker/src/processor/client_manage_processor.rs includes handling for producer groups but lacks handling for consumer groups. This confirms that the handling for consumer groups is indeed unimplemented as noted in the review comment.

  • rocketmq-broker/src/processor/client_manage_processor.rs: fn unregister_client (lines 80-100)
Analysis chain

The handling for consumer groups in unregister_client is marked as unimplemented. Please ensure this functionality is addressed in future updates or clarify if this is intended to be implemented later.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for any TODO or FIXME comments related to consumer group handling in `unregister_client`.

# Test: Search for TODO or FIXME comments in the current file. Expect: Any related comments.
rg --type rust --files-with-matches 'TODO|FIXME' 'client_manage_processor.rs'

Length of output: 144



Script:

#!/bin/bash
# Locate the correct path to the `client_manage_processor.rs` file.
fd 'client_manage_processor.rs'

Length of output: 88



Script:

#!/bin/bash
# Description: Check for any TODO or FIXME comments related to consumer group handling in `unregister_client`.

# Test: Search for TODO or FIXME comments in the current file. Expect: Any related comments.
rg --type rust --files-with-matches 'TODO|FIXME' 'rocketmq-broker/src/processor/client_manage_processor.rs'

Length of output: 107



Script:

#!/bin/bash
# Locate and inspect the `unregister_client` function within the `rocketmq-broker/src/processor/client_manage_processor.rs` file.

# Search for the `unregister_client` function definition.
rg 'fn unregister_client' rocketmq-broker/src/processor/client_manage_processor.rs

# Extract the function implementation for inspection.
rg -A 20 'fn unregister_client' rocketmq-broker/src/processor/client_manage_processor.rs

Length of output: 905

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved PR has approved auto merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]Support pull message consume-3

2 participants