Skip to content

[ISSUE #662]🚀Support pull message consume-2#664

Merged
mxsm merged 2 commits intomainfrom
feature-662
Jun 19, 2024
Merged

[ISSUE #662]🚀Support pull message consume-2#664
mxsm merged 2 commits intomainfrom
feature-662

Conversation

@mxsm
Copy link
Copy Markdown
Owner

@mxsm mxsm commented Jun 19, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #662

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced a new RequestSource enum to represent different request sources, with methods for value retrieval, validation, and parsing.
  • Bug Fixes

    • Corrected cloning behavior for the subscription_group_manager field in BrokerRuntime, ensuring proper duplication.
  • Configuration

    • Added a new auto_create_subscription_group field to BrokerConfig to control the auto-creation of subscription groups.
  • Improvements

    • Enhanced PullMessageProcessor to support generic message stores.
    • Introduced new constants and validation methods for SubscriptionData.
  • Dependencies

    • Added the regex dependency to support regular expressions.

@mxsm
Copy link
Copy Markdown
Owner Author

mxsm commented Jun 19, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jun 19, 2024

Walkthrough

This set of changes enhances the RocketMQ broker's functionality by integrating better subscription management, improved consumer filtering, and extended capabilities for processing pull messages. Noteworthy additions include the introduction of the PullSysFlag struct for detailed control of message flags, enhanced support for SQL-based and tag-based filtering, and the incorporation of a generic MessageStore type within key processor structures. These changes collectively aim to bolster message consumption features.

Changes

File/Path Change Summary
rocketmq-broker/src/broker_runtime.rs Added management for SubscriptionGroupManager<DefaultMessageStore> with conditional compilation.
rocketmq-broker/src/client/manager/consumer_manager.rs Introduced a compensate_subscribe_data method to ConsumerManager.
rocketmq-broker/src/filter/manager/consumer_filter_manager.rs Added struct ConsumerFilterData and functionality to build it.
rocketmq-broker/src/processor.rs Updated pull_message_processor to a generic type PullMessageProcessor<MS>.
rocketmq-broker/src/processor/pull_message_processor.rs Enhanced PullMessageProcessor to support MessageStore type and added methods for request processing.
rocketmq-common/Cargo.toml Added regex dependency at version 1.10.5.
rocketmq-common/src/common/broker/broker_config.rs Added auto_create_subscription_group to BrokerConfig.
rocketmq-common/src/common/filter/expression_type.rs Introduced constants for SQL92 and TAG, and is_tag_type method.
rocketmq-common/src/common/sys_flag.rs Added a new module pull_sys_flag.
rocketmq-common/src/common/sys_flag/pull_sys_flag.rs Introduced PullSysFlag struct with constants and methods for flag manipulation.
rocketmq-common/src/common/topic.rs Enhanced topic validation logic with Mutex and lazy_static.
rocketmq-remoting/src/protocol.rs Added modules filter, forbidden_type, and request_source.
rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs Removed decode and encode functions from TopicConfigSerializeWrapper.
rocketmq-remoting/src/protocol/filter.rs Added filter_api module for remoting filter functionality.
rocketmq-remoting/src/protocol/filter/filter_api.rs Introduced FilterAPI struct with methods to build subscription data.
rocketmq-remoting/src/protocol/forbidden_type.rs Added ForbiddenType struct with constants for different forbidden types.
rocketmq-remoting/src/protocol/header/pull_message_request_header.rs Changed queue_id to Option<i32> and updated related methods.
rocketmq-remoting/src/protocol/header/pull_message_response_header.rs Added traits RemotingSerializable, RequestHeaderCodec, and made fields public.
rocketmq-remoting/src/protocol/heartbeat/subscription_data.rs Added a new constant SUB_ALL to SubscriptionData.
rocketmq-remoting/src/protocol/request_source.rs Introduced RequestSource enum with methods for representing request origins.
rocketmq-remoting/src/protocol/subscription/subscription_group_config.rs Added multiple setter methods to SubscriptionGroupConfig.

Sequence Diagram(s)

sequenceDiagram
    Participant Client
    Participant Broker
    Participant SubscriptionManager
    Participant ConsumerManager
    Participant FilterAPI
    
    Client->>Broker: Send pull message request
    Broker->>SubscriptionManager: Check subscription
    SubscriptionManager-->>Broker: Return subscription details
    Broker->>ConsumerManager: Fetch consumer details
    ConsumerManager-->>Broker: Return consumer details
    Broker->>FilterAPI: Filter messages based on subscription
    FilterAPI-->>Broker: Return filtered messages
    Broker->>Client: Send messages
Loading

Assessment against linked issues

Objective Addressed Explanation
Support for pull message consume feature (Issue #662).
Enhanced filtering based on SQL and tags (Issue #662).
Extended request processing handling in pull messages (Issue #662).
Managing subscription groups dynamically via the broker (Issue #662).

Poem

In the realm of Rocket's flight,
New features shine with bright light.
Filters to filter, subscriptions in place,
Pull messages handle with elegant grace.
Code enhanced, the brokers gleam,
Progress made, fulfilling the dream.
🚀✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot requested review from SpaceXCN and TeslaRustor June 19, 2024 09:36
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

Outside diff range and nitpick comments (3)
rocketmq-remoting/src/protocol/subscription/subscription_group_config.rs (1)

113-163: The addition of setter methods in SubscriptionGroupConfig enhances the mutability and configurability of the struct. This is crucial for dynamic configuration changes at runtime. However, ensure proper synchronization if these settings are accessed concurrently.

Consider adding thread-safety mechanisms like locks if concurrent access is expected.

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

38-41: Consider documenting the struct SubscriptionGroupManager and its fields.

Adding documentation comments here can improve the maintainability and readability of the code, especially for public and protected fields.

rocketmq-common/src/common/broker/broker_config.rs (1)

Line range hint 146-201: Document the new field auto_create_subscription_group in the BrokerConfig struct.

Adding a comment here would help other developers understand the purpose of this field quickly.

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 0073f02 and 78e50e3.

Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
Files selected for processing (22)
  • rocketmq-broker/src/broker_runtime.rs (3 hunks)
  • rocketmq-broker/src/client/manager/consumer_manager.rs (1 hunks)
  • rocketmq-broker/src/filter/manager/consumer_filter_manager.rs (2 hunks)
  • rocketmq-broker/src/processor.rs (1 hunks)
  • rocketmq-broker/src/processor/pull_message_processor.rs (3 hunks)
  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (3 hunks)
  • rocketmq-common/Cargo.toml (1 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-common/src/common/filter/expression_type.rs (1 hunks)
  • rocketmq-common/src/common/sys_flag.rs (1 hunks)
  • rocketmq-common/src/common/sys_flag/pull_sys_flag.rs (1 hunks)
  • rocketmq-common/src/common/topic.rs (2 hunks)
  • rocketmq-remoting/src/protocol.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs (1 hunks)
  • rocketmq-remoting/src/protocol/filter.rs (1 hunks)
  • rocketmq-remoting/src/protocol/filter/filter_api.rs (1 hunks)
  • rocketmq-remoting/src/protocol/forbidden_type.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (7 hunks)
  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (2 hunks)
  • rocketmq-remoting/src/protocol/heartbeat/subscription_data.rs (2 hunks)
  • rocketmq-remoting/src/protocol/request_source.rs (1 hunks)
  • rocketmq-remoting/src/protocol/subscription/subscription_group_config.rs (1 hunks)
Files skipped from review due to trivial changes (5)
  • rocketmq-common/Cargo.toml
  • rocketmq-common/src/common/sys_flag.rs
  • rocketmq-remoting/src/protocol/body/topic_info_wrapper.rs
  • rocketmq-remoting/src/protocol/filter.rs
  • rocketmq-remoting/src/protocol/forbidden_type.rs
Additional context used
Learnings (1)
rocketmq-broker/src/client/manager/consumer_manager.rs (1)
User: TeslaRustor
PR: mxsm/rocketmq-rust#652
File: rocketmq-broker/src/client/manager/consumer_manager.rs:23-25
Timestamp: 2024-06-17T07:05:52.260Z
Learning: TeslaRustor plans to implement the `find_subscription_data` method in the `ConsumerManager` class in a future version and has acknowledged the current placeholder implementation.
Additional comments not posted (21)
rocketmq-common/src/common/filter/expression_type.rs (1)

20-29: The addition of SQL92 and TAG constants, along with the is_tag_type method, enhances the clarity and utility of the ExpressionType struct. Well implemented.

rocketmq-remoting/src/protocol/heartbeat/subscription_data.rs (1)

Line range hint 25-42: The updates to the SubscriptionData struct, including the addition of serialization traits and the SUB_ALL constant, are well-executed and improve the struct's utility and clarity.

rocketmq-remoting/src/protocol/request_source.rs (2)

17-23: The enum RequestSource is well-defined and covers different sources like SDK, ProxyForOrder, ProxyForBroadcast, and ProxyForStream. This categorization should facilitate handling different request origins effectively.


65-115: The unit tests cover different scenarios for the methods in RequestSource. These tests ensure that the methods behave as expected across valid and invalid inputs. No issues found here.

rocketmq-remoting/src/protocol/header/pull_message_response_header.rs (2)

28-38: The PullMessageResponseHeader struct is now enhanced with optional fields and serialization traits. This makes the data structure more flexible and robust for optional data handling. The use of Option for fields like suggest_which_broker_id and next_begin_offset is appropriate given the nature of network communication where not all data may be available.


50-50: The commented implementation block for PullMessageResponseHeader has been removed, which is a good cleanup if it's unused. Ensure that this removal does not impact any legacy systems that might rely on these constants.

Verification successful

The search results indicate that the constants removed from the commented block in pull_message_response_header.rs are indeed referenced in various files. To ensure that the removal does not impact any functionality, we need to verify if these references are part of the commented block or if they are defined elsewhere.

Let's proceed by checking the definitions and usages of these constants in the identified files to understand their context.


To ensure that the constants are not solely defined in the commented block and to check for their definitions elsewhere in the codebase, we will perform a comprehensive search for the definitions of these constants.


The search results indicate that the constants forbiddenType, groupSysFlag, maxOffset, minOffset, nextBeginOffset, offsetDelta, suggestWhichBrokerId, and topicSysFlag are defined in pull_message_response_header.rs and other files. Specifically:

  • maxOffset is also defined in broker_heartbeat_request_header.rs and namesrv/broker_request.rs.
  • groupSysFlag and topicSysFlag are mentioned in message.rs.

The presence of these constants in other files suggests that their removal from the commented block in pull_message_response_header.rs should not negatively impact the functionality, as they are defined elsewhere.

  • rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs
  • rocketmq-remoting/src/protocol/header/namesrv/broker_request.rs
  • rocketmq-common/src/common/message.rs

Thus, the cleanup seems safe and does not affect any legacy systems relying on these constants.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for usage of constants from the commented block in other parts of the codebase.

# Test: Search for constants usage. Expect: No occurrences.
rg --type rust 'forbiddenType|groupSysFlag|maxOffset|minOffset|nextBeginOffset|offsetDelta|suggestWhichBrokerId|topicSysFlag'

Length of output: 1960



Script:

#!/bin/bash
# Description: Check where the constants are defined and how they are used in the identified files.

# Get definitions and usages of the constants in the identified files.
rg --type rust 'forbiddenType|groupSysFlag|maxOffset|minOffset|nextBeginOffset|offsetDelta|suggestWhichBrokerId|topicSysFlag' rocketmq-store/src/queue.rs rocketmq-common/src/common/message.rs rocketmq-remoting/src/protocol/route/route_data_view.rs rocketmq-remoting/src/protocol/header/namesrv/broker_request.rs rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs

Length of output: 1203



Script:

#!/bin/bash
# Description: Search for definitions of the constants across the entire codebase.

# Search for definitions of the constants.
rg --type rust 'const.*(forbiddenType|groupSysFlag|maxOffset|minOffset|nextBeginOffset|offsetDelta|suggestWhichBrokerId|topicSysFlag)'

Length of output: 1403

rocketmq-common/src/common/sys_flag/pull_sys_flag.rs (2)

19-99: The PullSysFlag struct and its methods provide a robust mechanism for flag manipulation, which is essential for feature toggling and runtime behavior adjustments. The use of bitwise operations ensures efficient flag handling. The methods are well-implemented and cater to various flag-related operations.


102-164: The unit tests for PullSysFlag effectively cover all methods, ensuring that flag manipulations are correct. This is crucial for avoiding bugs related to feature toggling. No issues found here.

rocketmq-remoting/src/protocol/filter/filter_api.rs (1)

70-94: Ensure that the method build handles all edge cases correctly.

Verification successful

To ensure the build method handles all edge cases correctly, we need to analyze the context around its usage in the codebase. The rg command output shows several instances of FilterAPI::build and related methods. Let's inspect these instances for proper handling of edge cases.


The extracted lines show that the build method is tested thoroughly in various scenarios, including:

  • Creating correct subscription data for tag expression type.
  • Returning an error for an empty subscription string and non-tag expression type.

These tests ensure that the build method handles edge cases appropriately. Therefore, the implementation appears to be robust and well-tested.

  • rocketmq-remoting/src/protocol/filter/filter_api.rs: Tests for build method.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check for all usages of the `build` method to ensure it's being used correctly with valid parameters.

rg --type rust "FilterAPI::build"

Length of output: 1104



Script:

#!/bin/bash
# Extract lines around the usages of `FilterAPI::build` in `rocketmq-remoting/src/protocol/filter/filter_api.rs`
rg --type rust --context 5 "FilterAPI::build" rocketmq-remoting/src/protocol/filter/filter_api.rs

Length of output: 3008

rocketmq-common/src/common/topic.rs (1)

26-42: Validate the character map initialization in VALID_CHAR_BIT_MAP.

Verification successful

The VALID_CHAR_BIT_MAP initialization in rocketmq-common/src/common/topic.rs appears to be correctly set to cover the intended range of characters. The map includes '%', '-', '_', '|', digits '0'-'9', uppercase letters 'A'-'Z', and lowercase letters 'a'-'z'. Additionally, the usage of VALID_CHAR_BIT_MAP in the same file ensures that it is being utilized correctly.

  • Initialization: The character map is correctly initialized to include the intended characters.
  • Usage: The usage of VALID_CHAR_BIT_MAP in the file ensures that it is properly referenced and used for validation.

No issues were found with the current implementation.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that all characters in VALID_CHAR_BIT_MAP are correctly set and cover the intended range.

echo "${VALID_CHAR_BIT_MAP[@]}"

Length of output: 7



Script:

#!/bin/bash
# Description: Verify that all characters in VALID_CHAR_BIT_MAP are correctly set and cover the intended range.

# Use ripgrep to find the initialization of VALID_CHAR_BIT_MAP
rg "static ref VALID_CHAR_BIT_MAP" -A 15 rocketmq-common/src/common/topic.rs

# Use ripgrep to find any usage of VALID_CHAR_BIT_MAP in the codebase to ensure it's being used correctly
rg "VALID_CHAR_BIT_MAP" rocketmq-common/src/common/

Length of output: 873

rocketmq-broker/src/processor/pull_message_processor.rs (3)

21-56: The imports and struct declarations have been updated to accommodate new features. Ensure all the newly added dependencies are used appropriately and check for any potential version conflicts.


48-56: The PullMessageProcessor struct now supports a generic type MS. This change aligns with the broader system's move towards generic message store implementations. Ensure that all instantiations of this struct across the codebase are updated to reflect this change.


Line range hint 66-269: The process_request and process_request_inner methods contain complex logic for handling different types of requests and scenarios. Ensure thorough unit testing to cover all branches and edge cases, particularly with the new error handling and response modifications.

rocketmq-remoting/src/protocol/header/pull_message_request_header.rs (1)

26-26: The changes to PullMessageRequestHeader include making queue_id optional and adding several new fields to support advanced messaging features. This is a significant change and requires thorough testing to ensure that all functionalities that depend on queue_id handle the None case correctly.

Also applies to: 35-439

rocketmq-broker/src/processor.rs (1)

85-85: The change to make pull_message_processor generic over MS is consistent with the changes in pull_message_processor.rs. This ensures that the processor can handle different types of message stores. Good job maintaining consistency across related components.

rocketmq-remoting/src/protocol.rs (3)

38-38: The addition of the filter module aligns with the enhancements mentioned in the PR summary. Ensure the module is properly integrated and tested.


39-39: The addition of the forbidden_type module is noted. Verify its integration and usage across the system to ensure it functions as intended.

Verification successful

The addition of the forbidden_type module appears to be well-integrated into the system based on the references found. Here are the relevant locations where forbidden_type is used:

  • rocketmq-remoting/src/protocol/header/pull_message_response_header.rs
  • rocketmq-broker/src/processor/pull_message_processor.rs

These references suggest that the forbidden_type module is utilized in handling message responses and processing pull messages, indicating proper integration.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify usage of the `forbidden_type` module across the system.

# Test: Search for references to the `forbidden_type` module. Expect: Integration points in other modules.
rg --type rust 'forbidden_type'

Length of output: 1136


44-44: The addition of the request_source module complements the protocol enhancements. Verify its usage within the system to ensure it is correctly utilized.

Verification successful

The request_source module is well-integrated into the system. It is utilized in pull_message_request_header.rs for serialization and deserialization of request sources and in pull_message_processor.rs for processing different types of request sources. The integration appears correct and complete.

  • rocketmq-remoting/src/protocol/header/pull_message_request_header.rs
    • Serialization and deserialization of request_source.
  • rocketmq-broker/src/processor/pull_message_processor.rs
    • Handling different types of request sources using RequestSource::parse_integer.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify usage of the `request_source` module within the system.

# Test: Search for references to the `request_source` module. Expect: Proper utilization in relevant parts of the system.
rg --type rust 'request_source'

Length of output: 926



Script:

#!/bin/bash
# Extract the content of the `request_source` module.
fd -p rocketmq-remoting/src/protocol/request_source.rs

Length of output: 105



Script:

#!/bin/bash
# Display the content of the `request_source.rs` file.
cat rocketmq-remoting/src/protocol/request_source.rs

Length of output: 3801



Script:

#!/bin/bash
# Extract relevant lines from `pull_message_request_header.rs` to understand usage of `request_source`.
rg --type rust --context 5 'request_source' rocketmq-remoting/src/protocol/header/pull_message_request_header.rs

Length of output: 2353



Script:

#!/bin/bash
# Extract relevant lines from `pull_message_processor.rs` to understand usage of `request_source`.
rg --type rust --context 5 'request_source' rocketmq-broker/src/processor/pull_message_processor.rs

Length of output: 1208

rocketmq-broker/src/broker_runtime.rs (3)

75-76: The type of subscription_group_manager has been updated to include a generic type parameter DefaultMessageStore. This change aligns with the broader system enhancements to support generics for better type safety and flexibility. Ensure that all usages of SubscriptionGroupManager across the project are updated to reflect this change.


110-110: The cloning method for subscription_group_manager now properly clones the existing instance instead of creating a new default one. This change ensures that the cloned BrokerRuntime retains the same state as the original, which is crucial for consistency in runtime behavior. Good improvement for maintaining state consistency across clones.


175-175: The instantiation of subscription_group_manager in the new method of BrokerRuntime has been updated to directly create a new instance of SubscriptionGroupManager instead of using Default::default(). This explicit instantiation is clearer and ensures that any necessary initialization logic within SubscriptionGroupManager::new() is executed. This is a good practice, especially when the initialization might involve more complex logic in the future.

}

impl Default for PullMessageProcessor {
impl<MS> Default for PullMessageProcessor<MS> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the Default trait for PullMessageProcessor uses todo!(), which will panic at runtime if called. This should either be implemented properly or removed if the default instantiation is not required.

- todo!()
+ // Implement default values or remove this if not applicable

Also applies to: 65-65

Comment on lines +270 to +276
fn rewrite_request_for_static_topic(
&mut self,
_request_header: &PullMessageRequestHeader,
_mapping_context: &TopicQueueMappingContext,
) -> Option<RemotingCommand> {
unimplemented!()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method rewrite_request_for_static_topic uses unimplemented!(), which is a placeholder. This needs to be implemented before merging to avoid runtime panics.

- unimplemented!()
+ // Proper implementation goes here

Committable suggestion was skipped due to low confidence.

Comment on lines +27 to +33
pub fn compensate_subscribe_data(
&self,
_group: &str,
_topic: &str,
_subscription_data: &SubscriptionData,
) {
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compensate_subscribe_data method is currently a placeholder. Consider adding documentation or comments to clarify its intended future functionality and linking it to the relevant issue or roadmap item.

Would you like me to help document this method or open a GitHub issue to track its implementation?

Comment on lines +59 to +96
impl ConsumerFilterManager {
pub fn build(
topic: &str,
consumer_group: &str,
expression: &str,
type_: &str,
client_version: u64,
) -> Option<ConsumerFilterData> {
if ExpressionType::is_tag_type(Some(type_)) {
return None;
}

let mut consumer_filter_data = ConsumerFilterData::default();
consumer_filter_data.set_topic(topic.to_string());
consumer_filter_data.set_consumer_group(consumer_group.to_string());
consumer_filter_data.set_born_time(get_current_millis());
consumer_filter_data.set_dead_time(0);
consumer_filter_data.set_expression(expression.to_string());
consumer_filter_data.set_expression_type(type_.to_string());
consumer_filter_data.set_client_version(client_version);

/* let filter_factory = FilterFactory;
match filter_factory.get(&type_).compile(&expression) {
Ok(compiled_expression) => {
consumer_filter_data.set_compiled_expression(compiled_expression);
}
Err(e) => {
eprintln!(
"parse error: expr={}, topic={}, group={}, error={}",
expression, topic, consumer_group, e
);
return None;
}
}
*/
Some(consumer_filter_data)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build method in ConsumerFilterManager is well-implemented, enhancing the manager's functionality. However, the large block of commented-out code (lines 80-93) could lead to confusion and should be either integrated, removed, or clearly documented with its purpose or future plans.

Comment on lines +45 to +46
pub fn new() -> SubscriptionGroupManager<MS> {
unimplemented!()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The SubscriptionGroupManager struct and its implementation blocks exist, but the new method is not implemented. Ensure that the new method is properly implemented before merging.

  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs
Analysis chain

Ensure that the new function is properly implemented before merging.


Let's correct the regex pattern to properly search for the new method implementation in SubscriptionGroupManager<MS>.


Let's simplify the regex pattern to avoid complex quantifiers and ensure it matches the desired method.


To further investigate, let's search for the SubscriptionGroupManager struct definition and any implementation blocks for it. Then, we will specifically search for the new method within these implementation blocks.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Check if the `new` method is implemented in SubscriptionGroupManager.

rg --type rust "impl<MS> SubscriptionGroupManager<MS> {.*?pub fn new"

Length of output: 245



Script:

#!/bin/bash
# Description: Check if the `new` method is implemented in SubscriptionGroupManager.

rg --type rust "impl<MS> SubscriptionGroupManager<MS> {[^}]*pub fn new"

Length of output: 249



Script:

#!/bin/bash
# Description: Check if the `new` method is implemented in SubscriptionGroupManager.

rg --type rust "impl<MS> SubscriptionGroupManager<MS>.*pub fn new"

Length of output: 68



Script:

#!/bin/bash
# Description: Search for the `SubscriptionGroupManager` struct definition and its implementation blocks.

# Search for the struct definition
rg --type rust "struct SubscriptionGroupManager"

# Search for implementation blocks
rg --type rust "impl<MS> SubscriptionGroupManager<MS>"

# Specifically search for the `new` method within these implementation blocks
rg --type rust "impl<MS> SubscriptionGroupManager<MS>.*pub fn new"

Length of output: 516

Comment on lines +24 to +56
impl FilterAPI {
pub fn build_subscription_data(
topic: &str,
sub_string: &str,
) -> Result<SubscriptionData, String> {
let mut subscription_data = SubscriptionData {
topic: topic.to_string(),
sub_string: sub_string.to_string(),
..Default::default()
};

if sub_string.is_empty() || sub_string == SubscriptionData::SUB_ALL {
subscription_data.sub_string = SubscriptionData::SUB_ALL.to_string();
return Ok(subscription_data);
}

let tags: Vec<&str> = sub_string.split("||").collect();
if tags.is_empty() {
return Err("subString split error".to_string());
}

for tag in tags {
let trimmed_tag = tag.trim();
if !trimmed_tag.is_empty() {
subscription_data.tags_set.insert(trimmed_tag.to_string());
subscription_data
.code_set
.insert(JavaStringHasher::new().hash_str(tag));
}
}

Ok(subscription_data)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the method build_subscription_data for better error handling.

Consider using more descriptive error messages and handling potential errors in string operations like split.

Comment on lines +58 to +68
pub fn build_subscription_data_with_expression_type(
topic: &str,
sub_string: &str,
expression_type: Option<String>,
) -> Result<SubscriptionData, String> {
let mut subscription_data = FilterAPI::build_subscription_data(topic, sub_string)?;
if let Some(expr_type) = expression_type {
subscription_data.expression_type = expr_type;
}
Ok(subscription_data)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The expression_type parameter in the build_subscription_data_with_expression_type function is not validated. The test function build_subscription_data_with_expression_type_sets_expression_type only sets expression_type to a specific value without any validation logic.

  • rocketmq-remoting/src/protocol/filter/filter_api.rs: Lines 58-68
Analysis chain

Validate the expression_type parameter in build_subscription_data_with_expression_type.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that all calls to `build_subscription_data_with_expression_type` pass a valid expression type.

rg --type rust "build_subscription_data_with_expression_type"

Length of output: 440



Script:

#!/bin/bash
# Locate all instances where `build_subscription_data_with_expression_type` is called in the codebase.

rg "build_subscription_data_with_expression_type"

Length of output: 428



Script:

#!/bin/bash
# Search for the definition of the test function `build_subscription_data_with_expression_type_sets_expression_type` to examine its content.

rg "fn build_subscription_data_with_expression_type_sets_expression_type" -A 10

Length of output: 1059

Comment on lines +88 to +94
pub fn is_topic_or_group_illegal(name: &str) -> bool {
let len = VALID_CHAR_BIT_MAP.len();
for ch in name.chars() {
if (ch as usize) >= len || !VALID_CHAR_BIT_MAP[ch as usize] {
return true;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance the method is_topic_or_group_illegal by using more efficient character checks.

Consider using a set for character validation instead of an array for faster lookups.

Comment on lines +99 to +130
if topic.trim().is_empty() {
return ValidateTopicResult {
valid: false,
remark: String::from("The specified topic is blank."),
};
}

if Self::is_topic_or_group_illegal(topic) {
return ValidateTopicResult::new(
false,
"The specified topic contains illegal characters, allowing only ^[%|a-zA-Z0-9_-]+$",
);
return ValidateTopicResult {
valid: false,
remark: String::from(
"The specified topic contains illegal characters, allowing only \
^[%|a-zA-Z0-9_-]+$",
),
};
}

if topic.len() > TOPIC_MAX_LENGTH {
return ValidateTopicResult::new(
false,
"The specified topic is longer than topic max length.",
);
return ValidateTopicResult {
valid: false,
remark: format!(
"The specified topic is longer than topic max length {}.",
TOPIC_MAX_LENGTH
),
};
}

ValidateTopicResult::new(true, "")
ValidateTopicResult {
valid: true,
remark: String::new(),
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor validate_topic to improve error handling and feedback.

The method could return more specific errors related to why a topic is invalid, rather than a generic error message.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 19, 2024

Codecov Report

Attention: Patch coverage is 41.02079% with 312 lines in your changes missing coverage. Please review.

Project coverage is 26.13%. Comparing base (9a45a5f) to head (78e50e3).
Report is 1 commits behind head on main.

Current head 78e50e3 differs from pull request most recent head 694e7d2

Please upload reports for the commit 694e7d2 to get more accurate results.

Files Patch % Lines
...tmq-broker/src/processor/pull_message_processor.rs 0.00% 104 Missing ⚠️
...src/protocol/header/pull_message_request_header.rs 0.00% 53 Missing ⚠️
...protocol/subscription/subscription_group_config.rs 0.00% 44 Missing ⚠️
rocketmq-common/src/common/topic.rs 7.14% 39 Missing ⚠️
...subscription/manager/subscription_group_manager.rs 0.00% 32 Missing ⚠️
...oker/src/filter/manager/consumer_filter_manager.rs 0.00% 13 Missing ⚠️
...ocketmq-remoting/src/protocol/filter/filter_api.rs 89.15% 9 Missing ⚠️
...rc/protocol/header/pull_message_response_header.rs 0.00% 9 Missing ⚠️
...cketmq-common/src/common/filter/expression_type.rs 0.00% 3 Missing ⚠️
rocketmq-broker/src/broker_runtime.rs 0.00% 2 Missing ⚠️
... and 3 more
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #664      +/-   ##
==========================================
+ Coverage   25.58%   26.13%   +0.55%     
==========================================
  Files         236      240       +4     
  Lines       17348    17833     +485     
==========================================
+ Hits         4438     4661     +223     
- Misses      12910    13172     +262     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
@mxsm
Copy link
Copy Markdown
Owner Author

mxsm commented Jun 19, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 78e50e3 and 694e7d2.

Files selected for processing (1)
  • rocketmq-remoting/src/protocol/request_source.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • rocketmq-remoting/src/protocol/request_source.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved PR has approved auto merge feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature]Support pull message consume-2

2 participants