Skip to content

[ISSUE #763]🔥Topic manager support(Request code:17,18,21,215) 🚀#764

Merged
mxsm merged 2 commits intomainfrom
feature-763
Jul 11, 2024
Merged

[ISSUE #763]🔥Topic manager support(Request code:17,18,21,215) 🚀#764
mxsm merged 2 commits intomainfrom
feature-763

Conversation

@mxsm
Copy link
Copy Markdown
Owner

@mxsm mxsm commented Jul 11, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #763

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced methods to manage and clean consumer offsets by topic.
    • Added methods for constructing and parsing retry topics.
    • Added methods for handling topic configurations and attributes.
    • Introduced a delete_topics method in the message store for topic deletion management.
  • Enhancements

    • Improved error handling with .unwrap() for JSON decoding in several modules.
    • New fields in BrokerConfig to validate system topics and enable mixed message types.
    • Serialization and deserialization support added to various request headers.
  • Bug Fixes

    • Fixed potential decoding errors by incorporating .unwrap() in JSON handling methods.
  • Documentation

    • Updated module structures and added inline documentation for new methods and fields.

@mxsm
Copy link
Copy Markdown
Owner Author

mxsm commented Jul 11, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jul 11, 2024

Warning

Review failed

The pull request is closed.

Walkthrough

The changes introduce new features and improvements across multiple modules in the RocketMQ codebase. Key additions include new methods for managing topics and offsets, enhanced deserialization error handling, and new configurations for the broker. The updates to the BrokerRuntime, ConsumerOffsetManager, and BrokerOuterAPI modules, along with the inclusion of new constants, traits, and structs, ensure a more robust and flexible message processing system.

Changes

Files/Modules Change Summaries
rocketmq-broker/src/broker_runtime.rs Added AdminBrokerProcessor in imports; updated initialization in BrokerRuntime.
rocketmq-broker/src/offset/manager/consumer_offset_manager.rs Introduced clean_offset_by_topic and which_group_by_topic public functions.
rocketmq-broker/src/out_api/broker_outer_api.rs Modified JSON decoding of KVTable to include .unwrap() for error handling.
rocketmq-broker/src/processor.rs Added pop_inflight_message_counter module; updated request handling to include await.
rocketmq-common/src/common.rs Added pop_ack_constants and key_builder modules.
rocketmq-common/src/common/attribute.rs Introduced attribute_parser module.
rocketmq-common/src/common/broker/broker_config.rs Added fields validate_system_topic_when_update_topic and enable_mixed_message_type to BrokerConfig struct.
rocketmq-common/src/common/config.rs Added get_topic_message_type method to TopicConfig struct.
rocketmq-common/src/common/key_builder.rs Introduced KeyBuilder module with functions for retry topics management.
rocketmq-common/src/common/pop_ack_constants.rs Added PopAckConstants struct with new constants and methods.
rocketmq-common/src/common/serde_json_utils.rs Modified decode function to return Result<T, SerdeJsonError> instead of directly returning T.
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs Added .unwrap() to handle JSON decoding results.
rocketmq-remoting/src/code/request_code.rs Added UpdateAndCreateTopicList variant to RequestCode enum.
rocketmq-remoting/src/protocol.rs Introduced RemotingDeserializable trait with decode method for deserialization.
rocketmq-remoting/src/protocol/body.rs Added create_topic_list_request_body module.
rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs Updated JSON decoding to include .unwrap() for RegisterBrokerBody.
rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs Introduced CreateTopicListRequestBody struct with topic_config_list field.
rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs Added topic_queue_mapping_detail_map field to TopicConfigAndMappingSerializeWrapper struct.
rocketmq-remoting/src/protocol/header.rs Added modules for create_topic_request_header, delete_topic_request_header and get_all_topic_config_response_header.
rocketmq-remoting/src/protocol/header/create_topic_request_header.rs Introduced CreateTopicRequestHeader struct with serialization, deserialization, and custom header handling capabilities.
rocketmq-remoting/src/protocol/header/delete_topic_request_header.rs Introduced DeleteTopicRequestHeader struct with serialization, deserialization, and custom header handling capabilities.
rocketmq-remoting/src/protocol/header/get_all_topic_config_response_header.rs Introduced GetAllTopicConfigResponseHeader struct implementing CommandCustomHeader trait.
rocketmq-remoting/src/protocol/heartbeat/heartbeat_data.rs Updated tests to include .unwrap() after JSON decoding for HeartbeatData.
rocketmq-store/src/log_file.rs Added delete_topics function to RocketMQMessageStore trait.
rocketmq-store/src/message_store/default_message_store.rs Added delete_topics method to DefaultMessageStore for topic deletion.

Sequence Diagram(s)

No sequence diagrams generated as the changes reflect structural additions and error handling improvements without introducing complex new control flows.

Assessment against linked issues

Objective (Issues) Addressed Explanation
Support for Topic management (17, 18, 21, 215)
Enhanced deserialization error handling
Addition of new broker configurations
Introduction of new constants and modules
Methods for managing offsets and topics

In a world of code so tight and neat,
New changes come, quite the feat!
With topics managed and errors caught,
The RocketMQ ship sails without naught.
Configuration options now abound,
In this update, new strengths are found.
Let’s celebrate this code delight,
For every change, a future bright!


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Copy Markdown
Owner Author

mxsm commented Jul 11, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Outside diff range, codebase verification and nitpick comments (14)
rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (2)

31-31: Add a comment to describe the new field.

Consider adding a comment to describe the purpose of topic_queue_mapping_info_map for better readability.

// A map that holds TopicQueueMappingInfo for each topic.
pub topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,

34-34: Add a comment to describe the new field.

Consider adding a comment to describe the purpose of topic_queue_mapping_detail_map for better readability.

// A map that holds TopicQueueMappingDetail for each topic.
pub topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,
rocketmq-broker/src/processor/admin_broker_processor.rs (8)

17-17: Move the module declaration to a more appropriate location.

Consider moving the topic_request_handler module declaration to the top of the file for better organization.

mod topic_request_handler;

19-20: Group related imports together.

Group the related imports from the same crate together for better readability.

use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::code::response_code::ResponseCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;
use rocketmq_remoting::runtime::server::ConnectionHandlerContext;
use rocketmq_store::message_store::default_message_store::DefaultMessageStore;
use tracing::warn;

30-34: Consider adding a comment to describe the purpose of the imports.

Add comments to explain the purpose of each imported module for better code readability.

use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; // Manages consumer offsets.
use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler; // Handles topic-related requests.
use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter; // Manages in-flight message counters.
use crate::topic::manager::topic_config_manager::TopicConfigManager; // Manages topic configurations.
use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager; // Manages topic queue mappings.

36-39: Add a comment to describe the AdminBrokerProcessor struct.

Consider adding a comment to describe the purpose of the AdminBrokerProcessor struct.

/// AdminBrokerProcessor handles administrative requests related to broker operations.
#[derive(Clone)]
pub struct AdminBrokerProcessor {
    topic_request_handler: TopicRequestHandler,
}

42-61: Add comments to describe the parameters and purpose of the new method.

Consider adding comments to describe the parameters and the purpose of the new method in the AdminBrokerProcessor implementation.

impl AdminBrokerProcessor {
    /// Creates a new instance of AdminBrokerProcessor.
    ///
    /// # Parameters
    /// - `broker_config`: Configuration for the broker.
    /// - `topic_config_manager`: Manager for topic configurations.
    /// - `consumer_offset_manager`: Manager for consumer offsets.
    /// - `topic_queue_mapping_manager`: Manager for topic queue mappings.
    /// - `default_message_store`: Default message store.
    pub fn new(
        broker_config: Arc<BrokerConfig>,
        topic_config_manager: TopicConfigManager,
        consumer_offset_manager: ConsumerOffsetManager,
        topic_queue_mapping_manager: Arc<TopicQueueMappingManager>,
        default_message_store: DefaultMessageStore,
    ) -> Self {
        let inner = Inner {
            broker_config,
            topic_config_manager,
            consumer_offset_manager,
            topic_queue_mapping_manager,
            default_message_store,
            pop_inflight_message_counter: Arc::new(PopInflightMessageCounter),
        };
        let topic_request_handler = TopicRequestHandler::new(inner);
        AdminBrokerProcessor {
            topic_request_handler,
        }
    }
}

64-94: Add comments to describe the purpose of the process_request method.

Consider adding comments to describe the purpose of the process_request method and its match arms.

impl AdminBrokerProcessor {
    /// Processes incoming requests based on the request code.
    ///
    /// # Parameters
    /// - `channel`: The communication channel.
    /// - `ctx`: The connection handler context.
    /// - `request_code`: The code of the request.
    /// - `request`: The remoting command.
    ///
    /// # Returns
    /// An optional remoting command as a response.
    pub async fn process_request(
        &mut self,
        channel: Channel,
        ctx: ConnectionHandlerContext,
        request_code: RequestCode,
        request: RemotingCommand,
    ) -> Option<RemotingCommand> {
        match request_code {
            RequestCode::UpdateAndCreateTopic => {
                self.topic_request_handler
                    .update_and_create_topic(channel, ctx, request_code, request)
                    .await
            }
            RequestCode::UpdateAndCreateTopicList => {
                self.topic_request_handler
                    .update_and_create_topic_list(channel, ctx, request_code, request)
                    .await
            }
            RequestCode::GetAllTopicConfig => {
                self.topic_request_handler
                    .get_all_topic_config(channel, ctx, request_code, request)
                    .await
            }
            RequestCode::DeleteTopicInBroker => {
                self.topic_request_handler
                    .delete_topic(channel, ctx, request_code, request)
                    .await
            }
            _ => Some(get_unknown_cmd_response(request_code)),
        }
    }
}

98-108: Add a comment to describe the purpose of the get_unknown_cmd_response function.

Consider adding a comment to describe the purpose of the get_unknown_cmd_response function.

/// Generates a response indicating the request code is not supported.
///
/// # Parameters
/// - `request_code`: The unsupported request code.
///
/// # Returns
/// A remoting command with the response code indicating the request is not supported.
fn get_unknown_cmd_response(request_code: RequestCode) -> RemotingCommand {
    warn!(
        "request type {:?}-{} not supported",
        request_code,
        request_code.to_i32()
    );
    RemotingCommand::create_response_command_with_code_remark(
        ResponseCode::RequestCodeNotSupported,
        format!(" request type {} not supported", request_code.to_i32()),
    )
}

110-118: Add a comment to describe the Inner struct.

Consider adding a comment to describe the purpose of the Inner struct.

/// Inner struct holds the configuration and managers required by the AdminBrokerProcessor.
#[derive(Clone)]
struct Inner {
    broker_config: Arc<BrokerConfig>,
    topic_config_manager: TopicConfigManager,
    consumer_offset_manager: ConsumerOffsetManager,
    topic_queue_mapping_manager: Arc<TopicQueueMappingManager>,
    default_message_store: DefaultMessageStore,
    pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
}
rocketmq-common/src/common/config.rs (1)

61-70: Add comments to describe the get_topic_message_type method.

Consider adding comments to describe the purpose and logic of the get_topic_message_type method.

impl TopicConfig {
    // Other methods...

    /// Retrieves the message type of the topic based on its attributes.
    ///
    /// # Returns
    /// The topic message type.
    pub fn get_topic_message_type(&self) -> TopicMessageType {
        if self.attributes.is_empty() {
            return TopicMessageType::Normal;
        }
        let content = self.attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.get_name());
        if let Some(content) = content {
            return TopicMessageType::from(content.to_string());
        }
        TopicMessageType::Normal
    }
}
rocketmq-common/src/common/attribute/topic_message_type.rs (1)

34-46: Add comments to describe the From<String> implementation for TopicMessageType.

Consider adding comments to describe the purpose and logic of the From<String> implementation for the TopicMessageType enum.

impl From<String> for TopicMessageType {
    /// Converts a string to a TopicMessageType.
    ///
    /// # Parameters
    /// - `s`: The string to convert.
    ///
    /// # Returns
    /// The corresponding TopicMessageType.
    fn from(s: String) -> Self {
        match s.to_uppercase().as_str() {
            "UNSPECIFIED" => Self::Unspecified,
            "NORMAL" => Self::Normal,
            "FIFO" => Self::Fifo,
            "DELAY" => Self::Delay,
            "TRANSACTION" => Self::Transaction,
            "MIXED" => Self::Mixed,
            _ => Self::Unspecified,
        }
    }
}
rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1)

Line range hint 92-94: Avoid using unwrap for error handling.

Using unwrap can cause the program to panic if the conversion to a JSON string fails. Consider handling the error gracefully.

- let content = serde_json::to_string(&wrapper).unwrap();
+ let content = match serde_json::to_string(&wrapper) {
+     Ok(content) => content,
+     Err(e) => {
+         error!("Failed to convert to JSON string: {}", e);
+         return;
+     }
+ };
rocketmq-remoting/src/protocol.rs (1)

361-364: Add documentation for the RemotingDeserializable trait.

Consider adding doc comments to explain the purpose of this trait and its methods.

/// A trait for types that can be deserialized from a byte vector.
pub trait RemotingDeserializable {
    type Output;
    fn decode(bytes: &[u8]) -> Result<Self::Output, SerdeJsonError>;
}
Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 5a62744 and 23cec23.

Files selected for processing (33)
  • rocketmq-broker/src/broker_runtime.rs (3 hunks)
  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1 hunks)
  • rocketmq-broker/src/out_api/broker_outer_api.rs (1 hunks)
  • rocketmq-broker/src/processor.rs (2 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor.rs (1 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (1 hunks)
  • rocketmq-broker/src/processor/client_manage_processor.rs (1 hunks)
  • rocketmq-broker/src/processor/pop_inflight_message_counter.rs (1 hunks)
  • rocketmq-broker/src/topic/manager/topic_config_manager.rs (2 hunks)
  • rocketmq-common/src/common.rs (1 hunks)
  • rocketmq-common/src/common/attribute.rs (1 hunks)
  • rocketmq-common/src/common/attribute/attribute_parser.rs (1 hunks)
  • rocketmq-common/src/common/attribute/topic_message_type.rs (2 hunks)
  • rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
  • rocketmq-common/src/common/config.rs (2 hunks)
  • rocketmq-common/src/common/key_builder.rs (1 hunks)
  • rocketmq-common/src/common/pop_ack_constants.rs (1 hunks)
  • rocketmq-common/src/utils/serde_json_utils.rs (1 hunks)
  • rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1 hunks)
  • rocketmq-namesrv/src/processor/default_request_processor.rs (3 hunks)
  • rocketmq-remoting/src/code/request_code.rs (2 hunks)
  • rocketmq-remoting/src/protocol.rs (3 hunks)
  • rocketmq-remoting/src/protocol/body.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs (1 hunks)
  • rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/create_topic_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/delete_topic_request_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/header/get_all_topic_config_response_header.rs (1 hunks)
  • rocketmq-remoting/src/protocol/heartbeat/heartbeat_data.rs (3 hunks)
  • rocketmq-store/src/log_file.rs (1 hunks)
  • rocketmq-store/src/message_store/default_message_store.rs (1 hunks)
Files skipped from review due to trivial changes (2)
  • rocketmq-broker/src/processor/client_manage_processor.rs
  • rocketmq-remoting/src/protocol/body.rs
Additional comments not posted (66)
rocketmq-broker/src/processor/pop_inflight_message_counter.rs (1)

18-18: LGTM: Struct definition

The struct PopInflightMessageCounter is defined correctly.

rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs (1)

21-25: LGTM: Struct definition

The struct CreateTopicListRequestBody is defined correctly with appropriate traits for serialization and deserialization.

rocketmq-common/src/common/attribute.rs (1)

18-18: LGTM: Module addition

The module attribute_parser is added correctly.

rocketmq-remoting/src/protocol/header.rs (1)

19-19: LGTM: Module addition

The module create_topic_request_header is added correctly.

rocketmq-remoting/src/protocol/header/get_all_topic_config_response_header.rs (2)

29-30: Verify the implementation of to_map.

The to_map method always returns None, which might not be the intended behavior. Ensure this is correct and consider adding a comment explaining why this is the case.


1-28: LGTM!

The rest of the code is straightforward and correct.

Also applies to: 31-39

rocketmq-common/src/common/pop_ack_constants.rs (2)

22-33: LGTM!

The constants are correctly defined.


35-41: LGTM!

The methods are correctly defined.

rocketmq-remoting/src/protocol/header/delete_topic_request_header.rs (3)

24-31: LGTM!

The struct definition is correct.


37-48: LGTM!

The CommandCustomHeader implementation is correct.


50-59: LGTM!

The FromMap implementation is correct.

rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs (1)

Line range hint 1-69:
LGTM!

The struct definition and the associated methods are correct.

rocketmq-common/src/common.rs (1)

41-41: Module imports look good.

The added modules key_builder and pop_ack_constants are correctly integrated.

Also applies to: 47-47

rocketmq-store/src/log_file.rs (1)

122-122: New method delete_topics looks good.

The delete_topics method follows the trait's design and is correctly integrated.

rocketmq-common/src/common/key_builder.rs (13)

1-16: File header and license look good.

The file header and Apache license are correctly included.


17-18: Imports look good.

The necessary imports are correctly included.


20-24: Constants look good.

The constants for revive queue and retry separators are correctly defined.


25-26: Struct definition looks good.

The KeyBuilder struct is correctly defined.


27-33: Method build_pop_retry_topic looks good.

The method correctly builds the retry topic based on the retry version.


35-40: Method build_pop_retry_topic_default looks good.

The method correctly builds the default retry topic.


42-47: Method build_pop_retry_topic_v2 looks good.

The method correctly builds the V2 retry topic.


49-54: Method build_pop_retry_topic_v1 looks good.

The method correctly builds the V1 retry topic.


56-67: Method parse_normal_topic looks good.

The method correctly parses the normal topic based on the retry topic prefix.


69-77: Method parse_normal_topic_default looks good.

The method correctly parses the default normal topic.


79-87: Method parse_group looks good.

The method correctly parses the group from the retry topic.


89-98: Method build_polling_key looks good.

The method correctly builds the polling key.


100-103: Method is_pop_retry_topic_v2 looks good.

The method correctly checks if the retry topic is of V2 type.

rocketmq-common/src/common/attribute/attribute_parser.rs (3)

61-75: LGTM!

The parse_to_string method is correctly implemented.


82-86: LGTM!

The unit test for parse_to_map with an empty string input is correctly implemented.


88-98: LGTM!

The unit test for parse_to_map with valid attributes string input is correctly implemented.

rocketmq-remoting/src/protocol/header/create_topic_request_header.rs (4)

73-107: LGTM!

The to_map method is correctly implemented, handling optional fields gracefully.


110-142: LGTM!

The from method is correctly implemented, handling missing fields by providing default values.


82-86: LGTM!

The unit test for parse_to_map with an empty string input is correctly implemented.


88-98: LGTM!

The unit test for parse_to_map with valid attributes string input is correctly implemented.

rocketmq-common/src/utils/serde_json_utils.rs (4)

22-26: LGTM!

The modification to return Result<T, SerdeJsonError> in the decode method is correctly implemented, improving error handling.


Line range hint 29-33: LGTM!

The from_json_str method is correctly implemented, returning a Result for proper error handling.


Line range hint 41-45: LGTM!

The to_json method is correctly implemented, returning a Result for proper error handling.


Line range hint 103-114: LGTM!

The unit test for from_json_str with a valid JSON string input is correctly implemented.

rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (2)

Line range hint 102-118: LGTM!

The put_kv_config method is correctly implemented with proper logging.


Line range hint 120-134: LGTM!

The delete_kv_config method is correctly implemented with proper logging.

rocketmq-common/src/common/broker/broker_config.rs (2)

163-164: New Fields Added: Verify Necessity and Usage

Two new fields validate_system_topic_when_update_topic and enable_mixed_message_type have been added to the BrokerConfig struct. Ensure these fields are necessary and used appropriately within the project.

Verification successful

New Fields Added: Verification Successful

The fields validate_system_topic_when_update_topic and enable_mixed_message_type added to the BrokerConfig struct are actively used in the topic_request_handler.rs file. Their presence is necessary and aligns with the project's functionality requirements.

  • Usage locations:
    • rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs
    • rocketmq-common/src/common/broker/broker_config.rs
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the new fields in the codebase.

# Test: Search for the usage of the new fields. Expect: Relevant code handling these fields.
rg --type rust 'validate_system_topic_when_update_topic|enable_mixed_message_type'

Length of output: 1180


238-239: Default Values for New Fields: Verify Intended Behavior

The default values for validate_system_topic_when_update_topic (true) and enable_mixed_message_type (false) seem reasonable. Verify that these defaults align with the intended behavior of the application.

Verification successful

Default Values for New Fields: Verify Intended Behavior

The default values for validate_system_topic_when_update_topic (true) and enable_mixed_message_type (false) are utilized in the application as intended. These fields are referenced in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs, where they influence conditional logic that aligns with their default values.

  • rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the default values of the new fields are used as intended.

# Test: Search for the default instantiation of `BrokerConfig` and ensure the new fields are handled correctly.
rg --type rust 'BrokerConfig::default'

Length of output: 550


Script:

#!/bin/bash
# Description: Verify the usage of `validate_system_topic_when_update_topic` and `enable_mixed_message_type` in `topic_queue_mapping_manager.rs`.

# Search for references to the new fields in the relevant file.
rg 'validate_system_topic_when_update_topic|enable_mixed_message_type' rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs -A 5 -B 5

Length of output: 147


Script:

#!/bin/bash
# Description: Verify the usage of `validate_system_topic_when_update_topic` and `enable_mixed_message_type` across the entire codebase.

# Search for references to the new fields in the entire codebase.
rg 'validate_system_topic_when_update_topic|enable_mixed_message_type' -A 5 -B 5

Length of output: 8275

rocketmq-remoting/src/code/request_code.rs (1)

27-27: New Request Code Added: Verify Usage

The new request code UpdateAndCreateTopicList has been added. Ensure this request code is used correctly within the project.

Also applies to: 170-170

Verification successful

Let's review the context in which UpdateAndCreateTopicList is used in rocketmq-broker/src/processor/admin_broker_processor.rs to ensure it is handled correctly.


New Request Code Added: Usage Verified

The new request code UpdateAndCreateTopicList is correctly integrated and handled within the codebase.

  • The request code is defined and included in the match statement in rocketmq-remoting/src/code/request_code.rs.
  • The handling logic for this request code is implemented in rocketmq-broker/src/processor/admin_broker_processor.rs.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the new request code in the codebase.

# Test: Search for the usage of the new request code. Expect: Relevant code handling this request code.
rg --type rust 'UpdateAndCreateTopicList'

Length of output: 330


Script:

#!/bin/bash
# Display the lines around the usage of UpdateAndCreateTopicList in admin_broker_processor.rs
rg --type rust --context 10 'UpdateAndCreateTopicList' rocketmq-broker/src/processor/admin_broker_processor.rs

Length of output: 986

rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (2)

78-93: New Function clean_offset_by_topic: Verify Correctness and Usage

The new function clean_offset_by_topic has been added. Ensure this function is implemented correctly and used appropriately within the project.

Verification successful

Function clean_offset_by_topic is correctly used and implemented.

The function clean_offset_by_topic is invoked within the delete_topic_in_broker method, which handles comprehensive clean-up tasks when a topic is deleted. This ensures that all related offsets are also removed, indicating correct usage and implementation.

  • rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs:
    • delete_topic_in_broker method
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the new function `clean_offset_by_topic` in the codebase.

# Test: Search for the usage of the new function. Expect: Relevant code handling this function.
rg --type rust 'clean_offset_by_topic'

Length of output: 276


Script:

#!/bin/bash
# Description: Extract context around the usage of `clean_offset_by_topic` in topic_request_handler.rs.

# Extract 10 lines before and after the usage of `clean_offset_by_topic` for context.
rg --type rust -A 10 -B 10 'clean_offset_by_topic' rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

Length of output: 755


95-106: New Function which_group_by_topic: Verify Correctness and Usage

The new function which_group_by_topic has been added. Ensure this function is implemented correctly and used appropriately within the project.

Verification successful

Verified: Function which_group_by_topic implementation and usage

The function which_group_by_topic is implemented correctly and is used appropriately within the project. The context in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs shows proper handling of the function's result.

  • rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (Function implementation)
  • rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (Function usage)
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of the new function `which_group_by_topic` in the codebase.

# Test: Search for the usage of the new function. Expect: Relevant code handling this function.
rg --type rust 'which_group_by_topic'

Length of output: 292


Script:

#!/bin/bash
# Extract lines of code around the usage of `which_group_by_topic` in `rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs` to verify its usage.

# Display 10 lines before and after the usage of `which_group_by_topic` for context.
rg --type rust 'which_group_by_topic' -A 10 -B 10 rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

Length of output: 920

rocketmq-broker/src/processor.rs (2)

75-75: New module added: pop_inflight_message_counter.

The addition of this module seems to be consistent with the overall module organization and should serve the intended purpose.


163-167: Ensure proper handling of async call.

The modification to make the call to admin_broker_processor.process_request async is correctly implemented. Ensure that this change does not introduce any issues related to async handling.

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (6)

41-44: New struct added: TopicRequestHandler.

The addition of this struct with an inner field seems appropriate for handling topic requests.


52-172: Review the implementation of update_and_create_topic.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


174-269: Review the implementation of update_and_create_topic_list.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


271-337: Review the implementation of delete_topic.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


339-375: Review the implementation of get_all_topic_config.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


377-389: Review the implementation of delete_topic_in_broker.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.

rocketmq-broker/src/topic/manager/topic_config_manager.rs (5)

393-397: Review the implementation of update_topic_config_list.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


399-402: Review the implementation of remove_topic_config.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


404-419: Review the implementation of delete_topic_config.

The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.


533-533: Review the implementation of data_version.

The method implementation appears correct. Ensure that it retrieves the data version as expected.


541-544: Review the implementation of broker_runtime_inner.

The method implementation appears correct. Ensure that it retrieves the broker runtime inner as expected.

rocketmq-namesrv/src/processor/default_request_processor.rs (5)

184-185: Ensure proper handling of .unwrap() in query_broker_topic_config.

The modification to handle JSON decoding with .unwrap() is correctly implemented. Ensure that this change does not introduce any issues related to error handling.


415-415: Ensure proper handling of .unwrap() in register_topic_to_name_srv.

The modification to handle JSON decoding with .unwrap() is correctly implemented. Ensure that this change does not introduce any issues related to error handling.


524-525: Ensure proper handling of .unwrap() in extract_register_topic_config_from_request.

The modification to handle JSON decoding with .unwrap() is correctly implemented. Ensure that this change does not introduce any issues related to error handling.


Line range hint 105-105:
Ensure proper handling of .unwrap() in get_kv_config.

The modification to handle JSON decoding with .unwrap() is correctly implemented. Ensure that this change does not introduce any issues related to error handling.


Line range hint 104-104:
Ensure proper handling of .unwrap() in delete_kv_config.

The modification to handle JSON decoding with .unwrap() is correctly implemented. Ensure that this change does not introduce any issues related to error handling.

rocketmq-remoting/src/protocol.rs (1)

382-387: Implementation of RemotingDeserializable for T looks good.

The implementation correctly uses SerdeJsonUtils::decode.

rocketmq-broker/src/broker_runtime.rs (2)

63-63: New import of AdminBrokerProcessor looks good.

The new import is necessary for the changes in the init_processor method.


430-437: Initialization of admin_broker_processor looks good.

The AdminBrokerProcessor is correctly initialized with the necessary parameters.

Comment on lines +1047 to +1063
fn delete_topics(&self, delete_topics: Vec<String>) {
if delete_topics.is_empty() {
return;
}
for topic in delete_topics {
let queue_table = self
.consume_queue_store
.find_consume_queue_map(topic.as_str());
if queue_table.is_none() {
continue;
}
/* for (queue_id, consume_queue) in queue_table.unwrap() {
consume_queue.lock().destroy();
self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
}*/
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncomment and review the deletion logic.

The deletion logic is currently commented out. Ensure that the deletion logic is correct and uncomment it to enable topic deletion.

        for topic in delete_topics {
            let queue_table = self
                .consume_queue_store
                .find_consume_queue_map(topic.as_str());
            if queue_table.is_none() {
                continue;
            }
            for (queue_id, consume_queue) in queue_table.unwrap() {
                consume_queue.lock().destroy();
                self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
            }
        }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn delete_topics(&self, delete_topics: Vec<String>) {
if delete_topics.is_empty() {
return;
}
for topic in delete_topics {
let queue_table = self
.consume_queue_store
.find_consume_queue_map(topic.as_str());
if queue_table.is_none() {
continue;
}
/* for (queue_id, consume_queue) in queue_table.unwrap() {
consume_queue.lock().destroy();
self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
}*/
}
}
for topic in delete_topics {
let queue_table = self
.consume_queue_store
.find_consume_queue_map(topic.as_str());
if queue_table.is_none() {
continue;
}
for (queue_id, consume_queue) in queue_table.unwrap() {
consume_queue.lock().destroy();
self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
}
}


let serialized = original.encode();
let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice());
let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper error handling instead of using unwrap().

Using unwrap() can cause the program to panic if an error occurs. Consider using proper error handling to gracefully handle potential errors.

- let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).unwrap();
+ let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).expect("Failed to deserialize HeartbeatData");

Also applies to: 80-80, 96-96

Comment on lines +21 to +23
pub fn clear_in_flight_message_num_by_topic_name(&self, _topic: &str) {
// TODO
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: Implement the method

The method clear_in_flight_message_num_by_topic_name is a placeholder and needs implementation.

Do you want me to help implement this method or open a GitHub issue to track this task?

Comment on lines +70 to 72
return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap();
}
todo!()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure error handling in decode method.

The decode method uses unwrap() which can panic. Consider using proper error handling.

-  return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap();
+  return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).expect("Failed to decode RegisterBrokerBody");
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap();
}
todo!()
return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).expect("Failed to decode RegisterBrokerBody");
}
todo!()

}
if let Some(body) = response.body() {
result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref());
result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid Using .unwrap(): Implement Proper Error Handling

Using .unwrap() can cause the program to panic if the result is an error. Instead, implement proper error handling to manage potential errors gracefully.

- result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap();
+ result.kv_table = match SerdeJsonUtils::decode::<KVTable>(body.as_ref()) {
+     Ok(table) => table,
+     Err(e) => {
+         error!("Failed to decode KVTable: {:?}", e);
+         return None;
+     }
+ };
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap();
result.kv_table = match SerdeJsonUtils::decode::<KVTable>(body.as_ref()) {
Ok(table) => table,
Err(e) => {
error!("Failed to decode KVTable: {:?}", e);
return None;
}
};

Comment on lines +29 to +59
pub fn parse_to_map(attributes_modification: &str) -> Result<HashMap<String, String>, String> {
if attributes_modification.is_empty() {
return Ok(HashMap::new());
}

let mut attributes = HashMap::new();
let kvs: Vec<&str> = attributes_modification
.split(ATTR_ARRAY_SEPARATOR_COMMA)
.collect();
for kv in kvs {
let mut key = String::new();
let mut value = String::new();
if kv.contains(ATTR_KEY_VALUE_EQUAL_SIGN) {
let splits: Vec<&str> = kv.split(ATTR_KEY_VALUE_EQUAL_SIGN).collect();
key.push_str(splits[0]);
value.push_str(splits[1]);
if !key.contains(ATTR_ADD_PLUS_SIGN) {
return Err(format!("add/alter attribute format is wrong: {}", key));
}
} else {
key.push_str(kv);
if !key.contains(ATTR_DELETE_MINUS_SIGN) {
return Err(format!("delete attribute format is wrong: {}", key));
}
}
if attributes.insert(key.clone(), value).is_some() {
return Err(format!("key duplication: {}", key));
}
}
Ok(attributes)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using splitn for performance improvement.

Using splitn with a limit of 2 can be more efficient when splitting the key-value pairs.

let splits: Vec<&str> = kv.splitn(2, ATTR_KEY_VALUE_EQUAL_SIGN).collect();

Comment on lines +79 to +80
let wrapper =
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using unwrap for error handling.

Using unwrap can cause the program to panic if the decoding fails. Consider handling the error gracefully.

- let wrapper = SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
+ let wrapper = match SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()) {
+     Ok(wrapper) => wrapper,
+     Err(e) => {
+         error!("Failed to decode KVConfigSerializeWrapper: {}", e);
+         return;
+     }
+ };
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let wrapper =
SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
let wrapper = match SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()) {
Ok(wrapper) => wrapper,
Err(e) => {
error!("Failed to decode KVConfigSerializeWrapper: {}", e);
return;
}
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved PR has approved auto merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Topic manager support(Request code:17,18,21,215)

2 participants