[ISSUE #763]🔥Topic manager support(Request code:17,18,21,215) 🚀#764
[ISSUE #763]🔥Topic manager support(Request code:17,18,21,215) 🚀#764
Conversation
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
|
Warning Review failedThe pull request is closed. WalkthroughThe changes introduce new features and improvements across multiple modules in the RocketMQ codebase. Key additions include new methods for managing topics and offsets, enhanced deserialization error handling, and new configurations for the broker. The updates to the Changes
Sequence Diagram(s)No sequence diagrams generated as the changes reflect structural additions and error handling improvements without introducing complex new control flows. Assessment against linked issues
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
|
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
There was a problem hiding this comment.
Actionable comments posted: 7
Outside diff range, codebase verification and nitpick comments (14)
rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (2)
31-31: Add a comment to describe the new field.Consider adding a comment to describe the purpose of
topic_queue_mapping_info_mapfor better readability.// A map that holds TopicQueueMappingInfo for each topic. pub topic_queue_mapping_info_map: HashMap<String /* topic */, TopicQueueMappingInfo>,
34-34: Add a comment to describe the new field.Consider adding a comment to describe the purpose of
topic_queue_mapping_detail_mapfor better readability.// A map that holds TopicQueueMappingDetail for each topic. pub topic_queue_mapping_detail_map: HashMap<String /* topic */, TopicQueueMappingDetail>,rocketmq-broker/src/processor/admin_broker_processor.rs (8)
17-17: Move the module declaration to a more appropriate location.Consider moving the
topic_request_handlermodule declaration to the top of the file for better organization.mod topic_request_handler;
19-20: Group related imports together.Group the related imports from the same crate together for better readability.
use rocketmq_common::common::broker::broker_config::BrokerConfig; use rocketmq_remoting::code::request_code::RequestCode; use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::net::channel::Channel; use rocketmq_remoting::protocol::remoting_command::RemotingCommand; use rocketmq_remoting::runtime::server::ConnectionHandlerContext; use rocketmq_store::message_store::default_message_store::DefaultMessageStore; use tracing::warn;
30-34: Consider adding a comment to describe the purpose of the imports.Add comments to explain the purpose of each imported module for better code readability.
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager; // Manages consumer offsets. use crate::processor::admin_broker_processor::topic_request_handler::TopicRequestHandler; // Handles topic-related requests. use crate::processor::pop_inflight_message_counter::PopInflightMessageCounter; // Manages in-flight message counters. use crate::topic::manager::topic_config_manager::TopicConfigManager; // Manages topic configurations. use crate::topic::manager::topic_queue_mapping_manager::TopicQueueMappingManager; // Manages topic queue mappings.
36-39: Add a comment to describe theAdminBrokerProcessorstruct.Consider adding a comment to describe the purpose of the
AdminBrokerProcessorstruct./// AdminBrokerProcessor handles administrative requests related to broker operations. #[derive(Clone)] pub struct AdminBrokerProcessor { topic_request_handler: TopicRequestHandler, }
42-61: Add comments to describe the parameters and purpose of thenewmethod.Consider adding comments to describe the parameters and the purpose of the
newmethod in theAdminBrokerProcessorimplementation.impl AdminBrokerProcessor { /// Creates a new instance of AdminBrokerProcessor. /// /// # Parameters /// - `broker_config`: Configuration for the broker. /// - `topic_config_manager`: Manager for topic configurations. /// - `consumer_offset_manager`: Manager for consumer offsets. /// - `topic_queue_mapping_manager`: Manager for topic queue mappings. /// - `default_message_store`: Default message store. pub fn new( broker_config: Arc<BrokerConfig>, topic_config_manager: TopicConfigManager, consumer_offset_manager: ConsumerOffsetManager, topic_queue_mapping_manager: Arc<TopicQueueMappingManager>, default_message_store: DefaultMessageStore, ) -> Self { let inner = Inner { broker_config, topic_config_manager, consumer_offset_manager, topic_queue_mapping_manager, default_message_store, pop_inflight_message_counter: Arc::new(PopInflightMessageCounter), }; let topic_request_handler = TopicRequestHandler::new(inner); AdminBrokerProcessor { topic_request_handler, } } }
64-94: Add comments to describe the purpose of theprocess_requestmethod.Consider adding comments to describe the purpose of the
process_requestmethod and its match arms.impl AdminBrokerProcessor { /// Processes incoming requests based on the request code. /// /// # Parameters /// - `channel`: The communication channel. /// - `ctx`: The connection handler context. /// - `request_code`: The code of the request. /// - `request`: The remoting command. /// /// # Returns /// An optional remoting command as a response. pub async fn process_request( &mut self, channel: Channel, ctx: ConnectionHandlerContext, request_code: RequestCode, request: RemotingCommand, ) -> Option<RemotingCommand> { match request_code { RequestCode::UpdateAndCreateTopic => { self.topic_request_handler .update_and_create_topic(channel, ctx, request_code, request) .await } RequestCode::UpdateAndCreateTopicList => { self.topic_request_handler .update_and_create_topic_list(channel, ctx, request_code, request) .await } RequestCode::GetAllTopicConfig => { self.topic_request_handler .get_all_topic_config(channel, ctx, request_code, request) .await } RequestCode::DeleteTopicInBroker => { self.topic_request_handler .delete_topic(channel, ctx, request_code, request) .await } _ => Some(get_unknown_cmd_response(request_code)), } } }
98-108: Add a comment to describe the purpose of theget_unknown_cmd_responsefunction.Consider adding a comment to describe the purpose of the
get_unknown_cmd_responsefunction./// Generates a response indicating the request code is not supported. /// /// # Parameters /// - `request_code`: The unsupported request code. /// /// # Returns /// A remoting command with the response code indicating the request is not supported. fn get_unknown_cmd_response(request_code: RequestCode) -> RemotingCommand { warn!( "request type {:?}-{} not supported", request_code, request_code.to_i32() ); RemotingCommand::create_response_command_with_code_remark( ResponseCode::RequestCodeNotSupported, format!(" request type {} not supported", request_code.to_i32()), ) }
110-118: Add a comment to describe theInnerstruct.Consider adding a comment to describe the purpose of the
Innerstruct./// Inner struct holds the configuration and managers required by the AdminBrokerProcessor. #[derive(Clone)] struct Inner { broker_config: Arc<BrokerConfig>, topic_config_manager: TopicConfigManager, consumer_offset_manager: ConsumerOffsetManager, topic_queue_mapping_manager: Arc<TopicQueueMappingManager>, default_message_store: DefaultMessageStore, pop_inflight_message_counter: Arc<PopInflightMessageCounter>, }rocketmq-common/src/common/config.rs (1)
61-70: Add comments to describe theget_topic_message_typemethod.Consider adding comments to describe the purpose and logic of the
get_topic_message_typemethod.impl TopicConfig { // Other methods... /// Retrieves the message type of the topic based on its attributes. /// /// # Returns /// The topic message type. pub fn get_topic_message_type(&self) -> TopicMessageType { if self.attributes.is_empty() { return TopicMessageType::Normal; } let content = self.attributes.get(TOPIC_MESSAGE_TYPE_ATTRIBUTE.get_name()); if let Some(content) = content { return TopicMessageType::from(content.to_string()); } TopicMessageType::Normal } }rocketmq-common/src/common/attribute/topic_message_type.rs (1)
34-46: Add comments to describe theFrom<String>implementation forTopicMessageType.Consider adding comments to describe the purpose and logic of the
From<String>implementation for theTopicMessageTypeenum.impl From<String> for TopicMessageType { /// Converts a string to a TopicMessageType. /// /// # Parameters /// - `s`: The string to convert. /// /// # Returns /// The corresponding TopicMessageType. fn from(s: String) -> Self { match s.to_uppercase().as_str() { "UNSPECIFIED" => Self::Unspecified, "NORMAL" => Self::Normal, "FIFO" => Self::Fifo, "DELAY" => Self::Delay, "TRANSACTION" => Self::Transaction, "MIXED" => Self::Mixed, _ => Self::Unspecified, } } }rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1)
Line range hint
92-94: Avoid usingunwrapfor error handling.Using
unwrapcan cause the program to panic if the conversion to a JSON string fails. Consider handling the error gracefully.- let content = serde_json::to_string(&wrapper).unwrap(); + let content = match serde_json::to_string(&wrapper) { + Ok(content) => content, + Err(e) => { + error!("Failed to convert to JSON string: {}", e); + return; + } + };rocketmq-remoting/src/protocol.rs (1)
361-364: Add documentation for theRemotingDeserializabletrait.Consider adding doc comments to explain the purpose of this trait and its methods.
/// A trait for types that can be deserialized from a byte vector. pub trait RemotingDeserializable { type Output; fn decode(bytes: &[u8]) -> Result<Self::Output, SerdeJsonError>; }
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (33)
- rocketmq-broker/src/broker_runtime.rs (3 hunks)
- rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (1 hunks)
- rocketmq-broker/src/out_api/broker_outer_api.rs (1 hunks)
- rocketmq-broker/src/processor.rs (2 hunks)
- rocketmq-broker/src/processor/admin_broker_processor.rs (1 hunks)
- rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (1 hunks)
- rocketmq-broker/src/processor/client_manage_processor.rs (1 hunks)
- rocketmq-broker/src/processor/pop_inflight_message_counter.rs (1 hunks)
- rocketmq-broker/src/topic/manager/topic_config_manager.rs (2 hunks)
- rocketmq-common/src/common.rs (1 hunks)
- rocketmq-common/src/common/attribute.rs (1 hunks)
- rocketmq-common/src/common/attribute/attribute_parser.rs (1 hunks)
- rocketmq-common/src/common/attribute/topic_message_type.rs (2 hunks)
- rocketmq-common/src/common/broker/broker_config.rs (2 hunks)
- rocketmq-common/src/common/config.rs (2 hunks)
- rocketmq-common/src/common/key_builder.rs (1 hunks)
- rocketmq-common/src/common/pop_ack_constants.rs (1 hunks)
- rocketmq-common/src/utils/serde_json_utils.rs (1 hunks)
- rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (1 hunks)
- rocketmq-namesrv/src/processor/default_request_processor.rs (3 hunks)
- rocketmq-remoting/src/code/request_code.rs (2 hunks)
- rocketmq-remoting/src/protocol.rs (3 hunks)
- rocketmq-remoting/src/protocol/body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs (1 hunks)
- rocketmq-remoting/src/protocol/body/topic_info_wrapper/topic_config_wrapper.rs (1 hunks)
- rocketmq-remoting/src/protocol/header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/create_topic_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/delete_topic_request_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/header/get_all_topic_config_response_header.rs (1 hunks)
- rocketmq-remoting/src/protocol/heartbeat/heartbeat_data.rs (3 hunks)
- rocketmq-store/src/log_file.rs (1 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (1 hunks)
Files skipped from review due to trivial changes (2)
- rocketmq-broker/src/processor/client_manage_processor.rs
- rocketmq-remoting/src/protocol/body.rs
Additional comments not posted (66)
rocketmq-broker/src/processor/pop_inflight_message_counter.rs (1)
18-18: LGTM: Struct definitionThe struct
PopInflightMessageCounteris defined correctly.rocketmq-remoting/src/protocol/body/create_topic_list_request_body.rs (1)
21-25: LGTM: Struct definitionThe struct
CreateTopicListRequestBodyis defined correctly with appropriate traits for serialization and deserialization.rocketmq-common/src/common/attribute.rs (1)
18-18: LGTM: Module additionThe module
attribute_parseris added correctly.rocketmq-remoting/src/protocol/header.rs (1)
19-19: LGTM: Module additionThe module
create_topic_request_headeris added correctly.rocketmq-remoting/src/protocol/header/get_all_topic_config_response_header.rs (2)
29-30: Verify the implementation ofto_map.The
to_mapmethod always returnsNone, which might not be the intended behavior. Ensure this is correct and consider adding a comment explaining why this is the case.
1-28: LGTM!The rest of the code is straightforward and correct.
Also applies to: 31-39
rocketmq-common/src/common/pop_ack_constants.rs (2)
22-33: LGTM!The constants are correctly defined.
35-41: LGTM!The methods are correctly defined.
rocketmq-remoting/src/protocol/header/delete_topic_request_header.rs (3)
24-31: LGTM!The struct definition is correct.
37-48: LGTM!The
CommandCustomHeaderimplementation is correct.
50-59: LGTM!The
FromMapimplementation is correct.rocketmq-remoting/src/protocol/body/broker_body/register_broker_body.rs (1)
Line range hint
1-69:
LGTM!The struct definition and the associated methods are correct.
rocketmq-common/src/common.rs (1)
41-41: Module imports look good.The added modules
key_builderandpop_ack_constantsare correctly integrated.Also applies to: 47-47
rocketmq-store/src/log_file.rs (1)
122-122: New methoddelete_topicslooks good.The
delete_topicsmethod follows the trait's design and is correctly integrated.rocketmq-common/src/common/key_builder.rs (13)
1-16: File header and license look good.The file header and Apache license are correctly included.
17-18: Imports look good.The necessary imports are correctly included.
20-24: Constants look good.The constants for revive queue and retry separators are correctly defined.
25-26: Struct definition looks good.The
KeyBuilderstruct is correctly defined.
27-33: Methodbuild_pop_retry_topiclooks good.The method correctly builds the retry topic based on the retry version.
35-40: Methodbuild_pop_retry_topic_defaultlooks good.The method correctly builds the default retry topic.
42-47: Methodbuild_pop_retry_topic_v2looks good.The method correctly builds the V2 retry topic.
49-54: Methodbuild_pop_retry_topic_v1looks good.The method correctly builds the V1 retry topic.
56-67: Methodparse_normal_topiclooks good.The method correctly parses the normal topic based on the retry topic prefix.
69-77: Methodparse_normal_topic_defaultlooks good.The method correctly parses the default normal topic.
79-87: Methodparse_grouplooks good.The method correctly parses the group from the retry topic.
89-98: Methodbuild_polling_keylooks good.The method correctly builds the polling key.
100-103: Methodis_pop_retry_topic_v2looks good.The method correctly checks if the retry topic is of V2 type.
rocketmq-common/src/common/attribute/attribute_parser.rs (3)
61-75: LGTM!The
parse_to_stringmethod is correctly implemented.
82-86: LGTM!The unit test for
parse_to_mapwith an empty string input is correctly implemented.
88-98: LGTM!The unit test for
parse_to_mapwith valid attributes string input is correctly implemented.rocketmq-remoting/src/protocol/header/create_topic_request_header.rs (4)
73-107: LGTM!The
to_mapmethod is correctly implemented, handling optional fields gracefully.
110-142: LGTM!The
frommethod is correctly implemented, handling missing fields by providing default values.
82-86: LGTM!The unit test for
parse_to_mapwith an empty string input is correctly implemented.
88-98: LGTM!The unit test for
parse_to_mapwith valid attributes string input is correctly implemented.rocketmq-common/src/utils/serde_json_utils.rs (4)
22-26: LGTM!The modification to return
Result<T, SerdeJsonError>in thedecodemethod is correctly implemented, improving error handling.
Line range hint
29-33: LGTM!The
from_json_strmethod is correctly implemented, returning aResultfor proper error handling.
Line range hint
41-45: LGTM!The
to_jsonmethod is correctly implemented, returning aResultfor proper error handling.
Line range hint
103-114: LGTM!The unit test for
from_json_strwith a valid JSON string input is correctly implemented.rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs (2)
Line range hint
102-118: LGTM!The
put_kv_configmethod is correctly implemented with proper logging.
Line range hint
120-134: LGTM!The
delete_kv_configmethod is correctly implemented with proper logging.rocketmq-common/src/common/broker/broker_config.rs (2)
163-164: New Fields Added: Verify Necessity and UsageTwo new fields
validate_system_topic_when_update_topicandenable_mixed_message_typehave been added to theBrokerConfigstruct. Ensure these fields are necessary and used appropriately within the project.Verification successful
New Fields Added: Verification Successful
The fields
validate_system_topic_when_update_topicandenable_mixed_message_typeadded to theBrokerConfigstruct are actively used in thetopic_request_handler.rsfile. Their presence is necessary and aligns with the project's functionality requirements.
- Usage locations:
rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rsrocketmq-common/src/common/broker/broker_config.rsScripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the new fields in the codebase. # Test: Search for the usage of the new fields. Expect: Relevant code handling these fields. rg --type rust 'validate_system_topic_when_update_topic|enable_mixed_message_type'Length of output: 1180
238-239: Default Values for New Fields: Verify Intended BehaviorThe default values for
validate_system_topic_when_update_topic(true) andenable_mixed_message_type(false) seem reasonable. Verify that these defaults align with the intended behavior of the application.Verification successful
Default Values for New Fields: Verify Intended Behavior
The default values for
validate_system_topic_when_update_topic(true) andenable_mixed_message_type(false) are utilized in the application as intended. These fields are referenced inrocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs, where they influence conditional logic that aligns with their default values.
rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rsScripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the default values of the new fields are used as intended. # Test: Search for the default instantiation of `BrokerConfig` and ensure the new fields are handled correctly. rg --type rust 'BrokerConfig::default'Length of output: 550
Script:
#!/bin/bash # Description: Verify the usage of `validate_system_topic_when_update_topic` and `enable_mixed_message_type` in `topic_queue_mapping_manager.rs`. # Search for references to the new fields in the relevant file. rg 'validate_system_topic_when_update_topic|enable_mixed_message_type' rocketmq-broker/src/topic/manager/topic_queue_mapping_manager.rs -A 5 -B 5Length of output: 147
Script:
#!/bin/bash # Description: Verify the usage of `validate_system_topic_when_update_topic` and `enable_mixed_message_type` across the entire codebase. # Search for references to the new fields in the entire codebase. rg 'validate_system_topic_when_update_topic|enable_mixed_message_type' -A 5 -B 5Length of output: 8275
rocketmq-remoting/src/code/request_code.rs (1)
27-27: New Request Code Added: Verify UsageThe new request code
UpdateAndCreateTopicListhas been added. Ensure this request code is used correctly within the project.Also applies to: 170-170
Verification successful
Let's review the context in which
UpdateAndCreateTopicListis used inrocketmq-broker/src/processor/admin_broker_processor.rsto ensure it is handled correctly.
New Request Code Added: Usage Verified
The new request code
UpdateAndCreateTopicListis correctly integrated and handled within the codebase.
- The request code is defined and included in the match statement in
rocketmq-remoting/src/code/request_code.rs.- The handling logic for this request code is implemented in
rocketmq-broker/src/processor/admin_broker_processor.rs.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the new request code in the codebase. # Test: Search for the usage of the new request code. Expect: Relevant code handling this request code. rg --type rust 'UpdateAndCreateTopicList'Length of output: 330
Script:
#!/bin/bash # Display the lines around the usage of UpdateAndCreateTopicList in admin_broker_processor.rs rg --type rust --context 10 'UpdateAndCreateTopicList' rocketmq-broker/src/processor/admin_broker_processor.rsLength of output: 986
rocketmq-broker/src/offset/manager/consumer_offset_manager.rs (2)
78-93: New Functionclean_offset_by_topic: Verify Correctness and UsageThe new function
clean_offset_by_topichas been added. Ensure this function is implemented correctly and used appropriately within the project.Verification successful
Function
clean_offset_by_topicis correctly used and implemented.The function
clean_offset_by_topicis invoked within thedelete_topic_in_brokermethod, which handles comprehensive clean-up tasks when a topic is deleted. This ensures that all related offsets are also removed, indicating correct usage and implementation.
rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs:
delete_topic_in_brokermethodScripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the new function `clean_offset_by_topic` in the codebase. # Test: Search for the usage of the new function. Expect: Relevant code handling this function. rg --type rust 'clean_offset_by_topic'Length of output: 276
Script:
#!/bin/bash # Description: Extract context around the usage of `clean_offset_by_topic` in topic_request_handler.rs. # Extract 10 lines before and after the usage of `clean_offset_by_topic` for context. rg --type rust -A 10 -B 10 'clean_offset_by_topic' rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rsLength of output: 755
95-106: New Functionwhich_group_by_topic: Verify Correctness and UsageThe new function
which_group_by_topichas been added. Ensure this function is implemented correctly and used appropriately within the project.Verification successful
Verified: Function
which_group_by_topicimplementation and usageThe function
which_group_by_topicis implemented correctly and is used appropriately within the project. The context inrocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rsshows proper handling of the function's result.
rocketmq-broker/src/offset/manager/consumer_offset_manager.rs(Function implementation)rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs(Function usage)Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the new function `which_group_by_topic` in the codebase. # Test: Search for the usage of the new function. Expect: Relevant code handling this function. rg --type rust 'which_group_by_topic'Length of output: 292
Script:
#!/bin/bash # Extract lines of code around the usage of `which_group_by_topic` in `rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs` to verify its usage. # Display 10 lines before and after the usage of `which_group_by_topic` for context. rg --type rust 'which_group_by_topic' -A 10 -B 10 rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rsLength of output: 920
rocketmq-broker/src/processor.rs (2)
75-75: New module added:pop_inflight_message_counter.The addition of this module seems to be consistent with the overall module organization and should serve the intended purpose.
163-167: Ensure proper handling of async call.The modification to make the call to
admin_broker_processor.process_requestasync is correctly implemented. Ensure that this change does not introduce any issues related to async handling.rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs (6)
41-44: New struct added:TopicRequestHandler.The addition of this struct with an
innerfield seems appropriate for handling topic requests.
52-172: Review the implementation ofupdate_and_create_topic.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
174-269: Review the implementation ofupdate_and_create_topic_list.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
271-337: Review the implementation ofdelete_topic.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
339-375: Review the implementation ofget_all_topic_config.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
377-389: Review the implementation ofdelete_topic_in_broker.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
rocketmq-broker/src/topic/manager/topic_config_manager.rs (5)
393-397: Review the implementation ofupdate_topic_config_list.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
399-402: Review the implementation ofremove_topic_config.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
404-419: Review the implementation ofdelete_topic_config.The method implementation appears correct. Ensure that all edge cases are handled, and proper error handling is in place.
533-533: Review the implementation ofdata_version.The method implementation appears correct. Ensure that it retrieves the data version as expected.
541-544: Review the implementation ofbroker_runtime_inner.The method implementation appears correct. Ensure that it retrieves the broker runtime inner as expected.
rocketmq-namesrv/src/processor/default_request_processor.rs (5)
184-185: Ensure proper handling of.unwrap()inquery_broker_topic_config.The modification to handle JSON decoding with
.unwrap()is correctly implemented. Ensure that this change does not introduce any issues related to error handling.
415-415: Ensure proper handling of.unwrap()inregister_topic_to_name_srv.The modification to handle JSON decoding with
.unwrap()is correctly implemented. Ensure that this change does not introduce any issues related to error handling.
524-525: Ensure proper handling of.unwrap()inextract_register_topic_config_from_request.The modification to handle JSON decoding with
.unwrap()is correctly implemented. Ensure that this change does not introduce any issues related to error handling.
Line range hint
105-105:
Ensure proper handling of.unwrap()inget_kv_config.The modification to handle JSON decoding with
.unwrap()is correctly implemented. Ensure that this change does not introduce any issues related to error handling.
Line range hint
104-104:
Ensure proper handling of.unwrap()indelete_kv_config.The modification to handle JSON decoding with
.unwrap()is correctly implemented. Ensure that this change does not introduce any issues related to error handling.rocketmq-remoting/src/protocol.rs (1)
382-387: Implementation ofRemotingDeserializableforTlooks good.The implementation correctly uses
SerdeJsonUtils::decode.rocketmq-broker/src/broker_runtime.rs (2)
63-63: New import ofAdminBrokerProcessorlooks good.The new import is necessary for the changes in the
init_processormethod.
430-437: Initialization ofadmin_broker_processorlooks good.The
AdminBrokerProcessoris correctly initialized with the necessary parameters.
| fn delete_topics(&self, delete_topics: Vec<String>) { | ||
| if delete_topics.is_empty() { | ||
| return; | ||
| } | ||
| for topic in delete_topics { | ||
| let queue_table = self | ||
| .consume_queue_store | ||
| .find_consume_queue_map(topic.as_str()); | ||
| if queue_table.is_none() { | ||
| continue; | ||
| } | ||
| /* for (queue_id, consume_queue) in queue_table.unwrap() { | ||
| consume_queue.lock().destroy(); | ||
| self.consume_queue_store.delete_queue(topic.as_str(), *queue_id); | ||
| }*/ | ||
| } | ||
| } |
There was a problem hiding this comment.
Uncomment and review the deletion logic.
The deletion logic is currently commented out. Ensure that the deletion logic is correct and uncomment it to enable topic deletion.
for topic in delete_topics {
let queue_table = self
.consume_queue_store
.find_consume_queue_map(topic.as_str());
if queue_table.is_none() {
continue;
}
for (queue_id, consume_queue) in queue_table.unwrap() {
consume_queue.lock().destroy();
self.consume_queue_store.delete_queue(topic.as_str(), *queue_id);
}
}Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn delete_topics(&self, delete_topics: Vec<String>) { | |
| if delete_topics.is_empty() { | |
| return; | |
| } | |
| for topic in delete_topics { | |
| let queue_table = self | |
| .consume_queue_store | |
| .find_consume_queue_map(topic.as_str()); | |
| if queue_table.is_none() { | |
| continue; | |
| } | |
| /* for (queue_id, consume_queue) in queue_table.unwrap() { | |
| consume_queue.lock().destroy(); | |
| self.consume_queue_store.delete_queue(topic.as_str(), *queue_id); | |
| }*/ | |
| } | |
| } | |
| for topic in delete_topics { | |
| let queue_table = self | |
| .consume_queue_store | |
| .find_consume_queue_map(topic.as_str()); | |
| if queue_table.is_none() { | |
| continue; | |
| } | |
| for (queue_id, consume_queue) in queue_table.unwrap() { | |
| consume_queue.lock().destroy(); | |
| self.consume_queue_store.delete_queue(topic.as_str(), *queue_id); | |
| } | |
| } |
|
|
||
| let serialized = original.encode(); | ||
| let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()); | ||
| let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).unwrap(); |
There was a problem hiding this comment.
Ensure proper error handling instead of using unwrap().
Using unwrap() can cause the program to panic if an error occurs. Consider using proper error handling to gracefully handle potential errors.
- let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).unwrap();
+ let deserialized = SerdeJsonUtils::decode::<HeartbeatData>(serialized.as_slice()).expect("Failed to deserialize HeartbeatData");Also applies to: 80-80, 96-96
| pub fn clear_in_flight_message_num_by_topic_name(&self, _topic: &str) { | ||
| // TODO | ||
| } |
There was a problem hiding this comment.
Reminder: Implement the method
The method clear_in_flight_message_num_by_topic_name is a placeholder and needs implementation.
Do you want me to help implement this method or open a GitHub issue to track this task?
| return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap(); | ||
| } | ||
| todo!() |
There was a problem hiding this comment.
Ensure error handling in decode method.
The decode method uses unwrap() which can panic. Consider using proper error handling.
- return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap();
+ return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).expect("Failed to decode RegisterBrokerBody");Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).unwrap(); | |
| } | |
| todo!() | |
| return SerdeJsonUtils::decode::<RegisterBrokerBody>(bytes.iter().as_slice()).expect("Failed to decode RegisterBrokerBody"); | |
| } | |
| todo!() |
| } | ||
| if let Some(body) = response.body() { | ||
| result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()); | ||
| result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap(); |
There was a problem hiding this comment.
Avoid Using .unwrap(): Implement Proper Error Handling
Using .unwrap() can cause the program to panic if the result is an error. Instead, implement proper error handling to manage potential errors gracefully.
- result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap();
+ result.kv_table = match SerdeJsonUtils::decode::<KVTable>(body.as_ref()) {
+ Ok(table) => table,
+ Err(e) => {
+ error!("Failed to decode KVTable: {:?}", e);
+ return None;
+ }
+ };Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| result.kv_table = SerdeJsonUtils::decode::<KVTable>(body.as_ref()).unwrap(); | |
| result.kv_table = match SerdeJsonUtils::decode::<KVTable>(body.as_ref()) { | |
| Ok(table) => table, | |
| Err(e) => { | |
| error!("Failed to decode KVTable: {:?}", e); | |
| return None; | |
| } | |
| }; |
| pub fn parse_to_map(attributes_modification: &str) -> Result<HashMap<String, String>, String> { | ||
| if attributes_modification.is_empty() { | ||
| return Ok(HashMap::new()); | ||
| } | ||
|
|
||
| let mut attributes = HashMap::new(); | ||
| let kvs: Vec<&str> = attributes_modification | ||
| .split(ATTR_ARRAY_SEPARATOR_COMMA) | ||
| .collect(); | ||
| for kv in kvs { | ||
| let mut key = String::new(); | ||
| let mut value = String::new(); | ||
| if kv.contains(ATTR_KEY_VALUE_EQUAL_SIGN) { | ||
| let splits: Vec<&str> = kv.split(ATTR_KEY_VALUE_EQUAL_SIGN).collect(); | ||
| key.push_str(splits[0]); | ||
| value.push_str(splits[1]); | ||
| if !key.contains(ATTR_ADD_PLUS_SIGN) { | ||
| return Err(format!("add/alter attribute format is wrong: {}", key)); | ||
| } | ||
| } else { | ||
| key.push_str(kv); | ||
| if !key.contains(ATTR_DELETE_MINUS_SIGN) { | ||
| return Err(format!("delete attribute format is wrong: {}", key)); | ||
| } | ||
| } | ||
| if attributes.insert(key.clone(), value).is_some() { | ||
| return Err(format!("key duplication: {}", key)); | ||
| } | ||
| } | ||
| Ok(attributes) | ||
| } |
There was a problem hiding this comment.
Consider using splitn for performance improvement.
Using splitn with a limit of 2 can be more efficient when splitting the key-value pairs.
let splits: Vec<&str> = kv.splitn(2, ATTR_KEY_VALUE_EQUAL_SIGN).collect();| let wrapper = | ||
| SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap(); |
There was a problem hiding this comment.
Avoid using unwrap for error handling.
Using unwrap can cause the program to panic if the decoding fails. Consider handling the error gracefully.
- let wrapper = SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap();
+ let wrapper = match SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()) {
+ Ok(wrapper) => wrapper,
+ Err(e) => {
+ error!("Failed to decode KVConfigSerializeWrapper: {}", e);
+ return;
+ }
+ };Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let wrapper = | |
| SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()).unwrap(); | |
| let wrapper = match SerdeJsonUtils::decode::<KVConfigSerializeWrapper>(content.as_bytes()) { | |
| Ok(wrapper) => wrapper, | |
| Err(e) => { | |
| error!("Failed to decode KVConfigSerializeWrapper: {}", e); | |
| return; | |
| } | |
| }; |
Which Issue(s) This PR Fixes(Closes)
Fixes #763
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
delete_topicsmethod in the message store for topic deletion management.Enhancements
.unwrap()for JSON decoding in several modules.BrokerConfigto validate system topics and enable mixed message types.Bug Fixes
.unwrap()in JSON handling methods.Documentation