|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
| 8 | + * |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | + * |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
| 16 | + */ |
| 17 | +use std::sync::Arc; |
| 18 | + |
| 19 | +use cheetah_string::CheetahString; |
| 20 | +use rocketmq_common::common::broker::broker_config::BrokerConfig; |
| 21 | +use rocketmq_runtime::RocketMQRuntime; |
| 22 | +use rocketmq_rust::ArcMut; |
| 23 | + |
| 24 | +use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager; |
| 25 | + |
| 26 | +const SEND_TIMEOUT: u64 = 3_000; |
| 27 | +const DEFAULT_PULL_TIMEOUT_MILLIS: u64 = 10_000; |
| 28 | +///### RocketMQ's EscapeBridge for Dead Letter Queue (DLQ) Mechanism |
| 29 | +/// |
| 30 | +/// In the context of message passing within RocketMQ, the `EscapeBridge` primarily handles the Dead |
| 31 | +/// Letter Queue (DLQ) mechanism. When messages fail to be successfully consumed by a consumer after |
| 32 | +/// multiple attempts, these messages are designated as "dead letters"—messages that cannot be |
| 33 | +/// processed normally. To prevent such messages from indefinitely blocking the consumer's |
| 34 | +/// processing flow, RocketMQ provides functionality to transfer these messages to a special queue |
| 35 | +/// known as the dead letter queue. |
| 36 | +/// |
| 37 | +/// The `EscapeBridge` acts as a bridge in this process, responsible for moving messages that have |
| 38 | +/// failed consumption from their original queue into the DLQ. This action helps maintain system |
| 39 | +/// health and prevents the entire consumption process from being obstructed by a few problematic |
| 40 | +/// messages. Additionally, it provides developers with an opportunity to analyze and address these |
| 41 | +/// abnormal messages at a later time. |
| 42 | +/// |
| 43 | +/// #### Functions of EscapeBridge |
| 44 | +/// |
| 45 | +/// - **Isolation of Problematic Messages:** Moves messages that cannot be consumed into the DLQ to |
| 46 | +/// ensure they do not continue to affect normal consumption processes. |
| 47 | +/// - **Preservation of Message Data:** Even if a message is considered unconsumable, its content is |
| 48 | +/// preserved, allowing for subsequent diagnosis or specialized handling. |
| 49 | +/// - **Support for Retry Logic:** For messages that may have failed due to transient issues, retry |
| 50 | +/// logic can be applied by requeueing or specially processing messages in the DLQ, enabling |
| 51 | +/// another attempt at consumption. |
| 52 | +/// |
| 53 | +/// Through this approach, RocketMQ enhances the reliability and stability of the messaging system. |
| 54 | +/// It also equips developers with better tools for managing and troubleshooting issues in message |
| 55 | +/// transmission. |
| 56 | +/// |
| 57 | +/// #### Conclusion |
| 58 | +/// |
| 59 | +/// RocketMQ's `EscapeBridge` plays a critical role in maintaining the robustness of the messaging |
| 60 | +/// system by effectively handling messages that cannot be processed. By isolating problematic |
| 61 | +/// messages, preserving their data, and supporting retry mechanisms, it ensures that the overall |
| 62 | +/// consumption process remains healthy and unobstructed. Developers gain valuable insights into |
| 63 | +/// message failures, aiding in the diagnosis and resolution of potential issues. |
| 64 | +/// |
| 65 | +/// **Note:** The specific configuration and usage methods may vary depending on the version of |
| 66 | +/// RocketMQ. Please refer to the official documentation for the most accurate information. |
| 67 | +pub(crate) struct EscapeBridge<MS> { |
| 68 | + inner_producer_group_name: CheetahString, |
| 69 | + inner_consumer_group_name: CheetahString, |
| 70 | + escape_bridge_runtime: RocketMQRuntime, |
| 71 | + message_store: ArcMut<MS>, |
| 72 | + broker_config: Arc<BrokerConfig>, |
| 73 | + topic_route_info_manager: Arc<TopicRouteInfoManager>, |
| 74 | +} |
0 commit comments