Skip to content

Commit 6146fbd

Browse files
committed
[ISSUE #676]♻️Refactor RemotingCommand struct🚀
1 parent db5c4d6 commit 6146fbd

6 files changed

Lines changed: 146 additions & 44 deletions

File tree

rocketmq-broker/src/broker_runtime.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ impl BrokerRuntime {
178178
topic_config_manager,
179179
topic_queue_mapping_manager,
180180
consumer_offset_manager: Arc::new(Default::default()),
181-
subscription_group_manager: Arc::new(SubscriptionGroupManager::new()),
181+
subscription_group_manager: Arc::new(SubscriptionGroupManager::new(
182+
broker_config.clone(),
183+
None,
184+
)),
182185
consumer_filter_manager: Arc::new(Default::default()),
183186
consumer_order_info_manager: Arc::new(Default::default()),
184187
message_store: None,

rocketmq-broker/src/processor/pull_message_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub struct PullMessageProcessor<MS> {
6868

6969
impl<MS> Default for PullMessageProcessor<MS> {
7070
fn default() -> Self {
71-
todo!()
71+
unimplemented!()
7272
}
7373
}
7474

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,17 @@ pub(crate) struct SubscriptionGroupManager<MS> {
4242
}
4343

4444
impl<MS> SubscriptionGroupManager<MS> {
45-
pub fn new() -> SubscriptionGroupManager<MS> {
46-
unimplemented!()
45+
pub fn new(
46+
broker_config: Arc<BrokerConfig>,
47+
message_store: Option<MS>,
48+
) -> SubscriptionGroupManager<MS> {
49+
Self {
50+
broker_config,
51+
subscription_group_wrapper: Arc::new(parking_lot::Mutex::new(
52+
SubscriptionGroupWrapper::default(),
53+
)),
54+
message_store,
55+
}
4756
}
4857
}
4958

rocketmq-remoting/src/codec/remoting_command_codec.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -161,22 +161,10 @@ impl Encoder<RemotingCommand> for RemotingCommandCodec {
161161
///
162162
/// This function will return an error if the encoding process fails.
163163
fn encode(&mut self, item: RemotingCommand, dst: &mut BytesMut) -> Result<(), Self::Error> {
164-
let header = item.fast_header_encode();
165-
let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
166-
let body_length = item.get_body().map_or(0, |b| b.len()) as i32;
167-
let total_length = 4 + header_length + body_length;
168-
169-
dst.reserve((total_length + 4) as usize);
170-
dst.put_i32(total_length);
171-
let serialize_type =
172-
RemotingCommand::mark_serialize_type(header_length, item.get_serialize_type());
173-
dst.put_i32(serialize_type);
174-
175-
if let Some(header_inner) = header {
176-
dst.put(header_inner);
177-
}
164+
let mut item = item;
165+
item.fast_header_encode(dst);
178166
if let Some(body_inner) = item.get_body() {
179-
dst.put(body_inner);
167+
dst.put(body_inner.as_ref());
180168
}
181169
Ok(())
182170
}

rocketmq-remoting/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717
#![allow(dead_code)]
18+
#![feature(sync_unsafe_cell)]
1819

1920
pub mod clients;
2021
pub mod code;

rocketmq-remoting/src/protocol/remoting_command.rs

Lines changed: 126 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
use std::cell::SyncUnsafeCell;
1718
use std::collections::HashMap;
1819
use std::sync::atomic::AtomicI32;
1920
use std::sync::atomic::Ordering;
2021
use std::sync::Arc;
2122
use std::sync::Once;
2223
use std::sync::RwLock;
2324

25+
use bytes::BufMut;
2426
use bytes::Bytes;
27+
use bytes::BytesMut;
2528
use lazy_static::lazy_static;
2629
use rocketmq_common::common::mq_version::RocketMqVersion;
2730
use serde::Deserialize;
2831
use serde::Serialize;
32+
use tracing::error;
2933

3034
use super::FastCodesHeader;
3135
use super::RemotingCommandType;
@@ -87,7 +91,8 @@ pub struct RemotingCommand {
8791
#[serde(skip)]
8892
suspended: bool,
8993
#[serde(skip)]
90-
command_custom_header: Option<Arc<dyn CommandCustomHeader + Send + Sync + 'static>>,
94+
command_custom_header:
95+
Option<Arc<SyncUnsafeCell<dyn CommandCustomHeader + Send + Sync + 'static>>>,
9196
#[serde(rename = "serializeTypeCurrentRPC")]
9297
serialize_type: SerializeType,
9398
}
@@ -135,10 +140,10 @@ impl RemotingCommand {
135140
}
136141

137142
impl RemotingCommand {
138-
pub fn create_request_command(
139-
code: impl Into<i32>,
140-
header: impl CommandCustomHeader + Sync + Send + 'static,
141-
) -> Self {
143+
pub fn create_request_command<T>(code: impl Into<i32>, header: T) -> Self
144+
where
145+
T: CommandCustomHeader + Sync + Send + 'static,
146+
{
142147
let mut command = Self::default()
143148
.set_code(code.into())
144149
.set_command_custom_header(header);
@@ -188,13 +193,12 @@ impl RemotingCommand {
188193
.mark_response_type()
189194
}
190195

191-
pub fn set_command_custom_header(
192-
mut self,
193-
//command_custom_header: Option<Box<dyn CommandCustomHeader + Send + 'static>>,
194-
command_custom_header: impl CommandCustomHeader + Send + Sync + 'static,
195-
) -> Self {
196-
self.command_custom_header = Some(Arc::new(command_custom_header));
197-
if let Some(cch) = &self.command_custom_header {
196+
pub fn set_command_custom_header<T>(mut self, command_custom_header: T) -> Self
197+
where
198+
T: CommandCustomHeader + Sync + Send + 'static,
199+
{
200+
self.command_custom_header = Some(Arc::new(SyncUnsafeCell::new(command_custom_header)));
201+
/*if let Some(cch) = &self.command_custom_header {
198202
let option = cch.to_map();
199203
200204
match &mut self.ext_fields {
@@ -209,16 +213,16 @@ impl RemotingCommand {
209213
}
210214
}
211215
}
212-
}
216+
}*/
213217
self
214218
}
215219

216-
pub fn set_command_custom_header_ref(
217-
&mut self,
218-
command_custom_header: impl CommandCustomHeader + Send + Sync + 'static,
219-
) {
220-
self.command_custom_header = Some(Arc::new(command_custom_header));
221-
if let Some(cch) = &self.command_custom_header {
220+
pub fn set_command_custom_header_ref<T>(&mut self, command_custom_header: T)
221+
where
222+
T: CommandCustomHeader + Sync + Send + 'static,
223+
{
224+
self.command_custom_header = Some(Arc::new(SyncUnsafeCell::new(command_custom_header)));
225+
/*if let Some(cch) = &self.command_custom_header {
222226
let option = cch.to_map();
223227
224228
match &mut self.ext_fields {
@@ -233,7 +237,7 @@ impl RemotingCommand {
233237
}
234238
}
235239
}
236-
}
240+
}*/
237241
}
238242

239243
pub fn set_code(mut self, code: impl Into<i32>) -> Self {
@@ -322,20 +326,89 @@ impl RemotingCommand {
322326
}
323327

324328
pub fn header_encode(&self) -> Option<Bytes> {
325-
self.command_custom_header.as_ref().and_then(|cch| {
326-
cch.to_map()
329+
self.command_custom_header.as_ref().and_then(|header| {
330+
let header_ptr = header.get();
331+
let header_ref = unsafe { &*(header_ptr as *const dyn CommandCustomHeader) };
332+
header_ref
333+
.to_map()
327334
.as_ref()
328335
.map(|val| Bytes::from(serde_json::to_vec(val).unwrap()))
329336
})
330337
}
331338

332-
pub fn fast_header_encode(&self) -> Option<Bytes> {
339+
pub fn make_custom_header_to_net(&mut self) {
340+
if let Some(header) = &self.command_custom_header {
341+
let header_ptr = header.get();
342+
let header_ref = unsafe { &*(header_ptr as *const dyn CommandCustomHeader) };
343+
let option = header_ref.to_map();
344+
345+
match &mut self.ext_fields {
346+
None => {
347+
self.ext_fields = option;
348+
}
349+
Some(ext) => {
350+
if let Some(val) = option {
351+
for (key, value) in &val {
352+
ext.insert(key.clone(), value.clone());
353+
}
354+
}
355+
}
356+
}
357+
}
358+
}
359+
360+
/* pub fn fast_header_encode(&self) -> Option<Bytes> {
333361
let st = serde_json::to_string(self).unwrap();
334362
Some(Bytes::from(st))
363+
}*/
364+
365+
pub fn fast_header_encode(&mut self, dst: &mut BytesMut) {
366+
match self.serialize_type {
367+
SerializeType::JSON => {
368+
self.make_custom_header_to_net();
369+
let header = match serde_json::to_vec(self) {
370+
Ok(value) => Some(value),
371+
Err(e) => {
372+
error!("Failed to encode generic: {}", e);
373+
None
374+
}
375+
};
376+
let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
377+
let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
378+
let total_length = 4 + header_length + body_length;
379+
380+
dst.reserve((total_length + 4) as usize);
381+
dst.put_i32(total_length);
382+
let serialize_type =
383+
RemotingCommand::mark_serialize_type(header_length, SerializeType::JSON);
384+
dst.put_i32(serialize_type);
385+
386+
if let Some(header_inner) = header {
387+
dst.put(header_inner.as_slice());
388+
}
389+
}
390+
SerializeType::ROCKETMQ => {}
391+
}
392+
393+
/*let header_length = header.as_ref().map_or(0, |h| h.len()) as i32;
394+
let body_length = self.body.as_ref().map_or(0, |b| b.len()) as i32;
395+
let total_length = 4 + header_length + body_length;
396+
397+
dst.reserve((total_length + 4) as usize);
398+
dst.put_i32(total_length);
399+
let serialize_type =
400+
RemotingCommand::mark_serialize_type(header_length, item.get_serialize_type());
401+
dst.put_i32(serialize_type);
402+
403+
if let Some(header_inner) = header {
404+
dst.put(header_inner);
405+
}
406+
407+
let st = serde_json::to_string(self).unwrap();*/
335408
}
336409

337-
pub fn get_body(&self) -> Option<Bytes> {
338-
self.body.as_ref().cloned()
410+
pub fn get_body(&self) -> Option<&Bytes> {
411+
self.body.as_ref()
339412
}
340413

341414
pub fn mark_serialize_type(header_length: i32, protocol_type: SerializeType) -> i32 {
@@ -453,6 +526,34 @@ impl RemotingCommand {
453526
pub fn get_ext_fields(&self) -> Option<&HashMap<String, String>> {
454527
self.ext_fields.as_ref()
455528
}
529+
530+
pub fn read_custom_header<T>(&mut self) -> Option<&T>
531+
where
532+
T: CommandCustomHeader + Sync + Send,
533+
{
534+
match self.command_custom_header.as_ref() {
535+
None => None,
536+
Some(value) => {
537+
let value = value.get();
538+
let value = value as *const dyn CommandCustomHeader as *const T;
539+
unsafe { Some(&*value) }
540+
}
541+
}
542+
}
543+
544+
pub fn read_custom_header_mut<T>(&mut self) -> Option<&mut T>
545+
where
546+
T: CommandCustomHeader + Sync + Send,
547+
{
548+
match self.command_custom_header.as_ref() {
549+
None => None,
550+
Some(value) => {
551+
let value = value.get();
552+
let value = value as *const dyn CommandCustomHeader as *mut T;
553+
unsafe { Some(&mut *value) }
554+
}
555+
}
556+
}
456557
}
457558

458559
impl AsRef<RemotingCommand> for RemotingCommand {

0 commit comments

Comments
 (0)