Skip to content

Commit 4f5a7cb

Browse files
authored
[ISSUE #1695]🍻Add EscapeBridge struct for rust🚀
1 parent 00232a1 commit 4f5a7cb

3 files changed

Lines changed: 92 additions & 0 deletions

File tree

rocketmq-broker/src/failover.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
pub mod escape_bridge;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::sync::Arc;
18+
19+
use cheetah_string::CheetahString;
20+
use rocketmq_common::common::broker::broker_config::BrokerConfig;
21+
use rocketmq_runtime::RocketMQRuntime;
22+
use rocketmq_rust::ArcMut;
23+
24+
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
25+
26+
const SEND_TIMEOUT: u64 = 3_000;
27+
const DEFAULT_PULL_TIMEOUT_MILLIS: u64 = 10_000;
28+
///### RocketMQ's EscapeBridge for Dead Letter Queue (DLQ) Mechanism
29+
///
30+
/// In the context of message passing within RocketMQ, the `EscapeBridge` primarily handles the Dead
31+
/// Letter Queue (DLQ) mechanism. When messages fail to be successfully consumed by a consumer after
32+
/// multiple attempts, these messages are designated as "dead letters"—messages that cannot be
33+
/// processed normally. To prevent such messages from indefinitely blocking the consumer's
34+
/// processing flow, RocketMQ provides functionality to transfer these messages to a special queue
35+
/// known as the dead letter queue.
36+
///
37+
/// The `EscapeBridge` acts as a bridge in this process, responsible for moving messages that have
38+
/// failed consumption from their original queue into the DLQ. This action helps maintain system
39+
/// health and prevents the entire consumption process from being obstructed by a few problematic
40+
/// messages. Additionally, it provides developers with an opportunity to analyze and address these
41+
/// abnormal messages at a later time.
42+
///
43+
/// #### Functions of EscapeBridge
44+
///
45+
/// - **Isolation of Problematic Messages:** Moves messages that cannot be consumed into the DLQ to
46+
/// ensure they do not continue to affect normal consumption processes.
47+
/// - **Preservation of Message Data:** Even if a message is considered unconsumable, its content is
48+
/// preserved, allowing for subsequent diagnosis or specialized handling.
49+
/// - **Support for Retry Logic:** For messages that may have failed due to transient issues, retry
50+
/// logic can be applied by requeueing or specially processing messages in the DLQ, enabling
51+
/// another attempt at consumption.
52+
///
53+
/// Through this approach, RocketMQ enhances the reliability and stability of the messaging system.
54+
/// It also equips developers with better tools for managing and troubleshooting issues in message
55+
/// transmission.
56+
///
57+
/// #### Conclusion
58+
///
59+
/// RocketMQ's `EscapeBridge` plays a critical role in maintaining the robustness of the messaging
60+
/// system by effectively handling messages that cannot be processed. By isolating problematic
61+
/// messages, preserving their data, and supporting retry mechanisms, it ensures that the overall
62+
/// consumption process remains healthy and unobstructed. Developers gain valuable insights into
63+
/// message failures, aiding in the diagnosis and resolution of potential issues.
64+
///
65+
/// **Note:** The specific configuration and usage methods may vary depending on the version of
66+
/// RocketMQ. Please refer to the official documentation for the most accurate information.
67+
pub(crate) struct EscapeBridge<MS> {
68+
inner_producer_group_name: CheetahString,
69+
inner_consumer_group_name: CheetahString,
70+
escape_bridge_runtime: RocketMQRuntime,
71+
message_store: ArcMut<MS>,
72+
broker_config: Arc<BrokerConfig>,
73+
topic_route_info_manager: Arc<TopicRouteInfoManager>,
74+
}

rocketmq-broker/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub(crate) mod broker_runtime;
3434
pub(crate) mod client;
3535
pub(crate) mod coldctr;
3636
pub(crate) mod controller;
37+
pub(crate) mod failover;
3738
pub(crate) mod filter;
3839
pub(crate) mod hook;
3940
pub(crate) mod load_balance;

0 commit comments

Comments
 (0)