[ISSUE #1234]🐛NameServer Cannot drop a runtime in a context where blocking is not allowed#1238
[ISSUE #1234]🐛NameServer Cannot drop a runtime in a context where blocking is not allowed#1238rocketmq-rust-bot merged 1 commit intomainfrom
Conversation
…cking is not allowed
WalkthroughThe changes made in this pull request enhance the asynchronous functionality of the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1238 +/- ##
==========================================
- Coverage 18.03% 18.02% -0.01%
==========================================
Files 428 428
Lines 53549 53563 +14
==========================================
Hits 9655 9655
- Misses 43894 43908 +14 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (2)
rocketmq-namesrv/src/bootstrap.rs (1)
63-67: Remove commented code.The old
select!implementation is commented out and should be removed as it's no longer needed.- /*select! { - _ = self.name_server_runtime.start() =>{ - - } - }*/🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 63-63: rocketmq-namesrv/src/bootstrap.rs#L63
Added line #L63 was not covered by testsrocketmq-remoting/src/clients/rocketmq_default_impl.rs (1)
Line range hint
1-999: Review architectural impact on runtime dropping bugWhile the changes improve the client implementation, they don't fully address the core issue from PR #1234 regarding NameServer runtime dropping in non-blocking contexts. The current implementation still has potential blocking operations in the name server scanning loop.
Consider:
- Making the name server scanning truly non-blocking by using async channels for updates
- Implementing proper shutdown handling to ensure clean runtime dropping
- Adding metrics and logging for better observability of the name server connection state
Would you like me to propose a more comprehensive architectural solution?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 243-243: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L243
Added line #L243 was not covered by tests
[warning] 247-247: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L247
Added line #L247 was not covered by tests
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
rocketmq-namesrv/src/bootstrap.rs(3 hunks)rocketmq-remoting/src/clients/rocketmq_default_impl.rs(2 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
rocketmq-namesrv/src/bootstrap.rs
[warning] 63-63: rocketmq-namesrv/src/bootstrap.rs#L63
Added line #L63 was not covered by tests
[warning] 67-68: rocketmq-namesrv/src/bootstrap.rs#L67-L68
Added lines #L67 - L68 were not covered by tests
[warning] 78-91: rocketmq-namesrv/src/bootstrap.rs#L78-L91
Added lines #L78 - L91 were not covered by tests
rocketmq-remoting/src/clients/rocketmq_default_impl.rs
[warning] 243-243: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L243
Added line #L243 was not covered by tests
[warning] 247-247: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L247
Added line #L247 was not covered by tests
[warning] 295-295: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L295
Added line #L295 was not covered by tests
🔇 Additional comments (2)
rocketmq-namesrv/src/bootstrap.rs (1)
Line range hint 63-91: Verify concurrent execution and shutdown behavior.
The changes correctly address the blocking issue by introducing proper async patterns. However, we should verify the complete lifecycle:
- Server startup doesn't block
- Graceful shutdown works when signal is received
- Resources are properly cleaned up
Let's verify the shutdown behavior:
✅ Verification successful
Based on the extensive search results, I can now provide a final assessment of the shutdown behavior in the codebase.
Proper shutdown handling is implemented with graceful cleanup
The codebase has comprehensive shutdown handling:
- The
NameServerRuntimeimplementsDroptrait with proper cleanup - The server uses tokio's graceful shutdown pattern with:
tokio::join!for concurrent execution- Signal handling via
wait_for_signal() - Proper resource cleanup through
Dropimplementations
The key components work together:
Shutdownstruct manages shutdown state and notifications- Broadcast channels coordinate shutdown across components
- Resources are properly dropped via
Droptrait implementations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for proper resource cleanup in shutdown paths
# Look for Drop trait implementations
rg -A 5 "impl Drop"
# Look for shutdown/cleanup related functions
rg -A 5 "shutdown|cleanup|drop"
# Check for potential resource leaks (holding strong references)
ast-grep --pattern 'Arc::new($expr)'
Length of output: 164164
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 78-91: rocketmq-namesrv/src/bootstrap.rs#L78-L91
Added lines #L78 - L91 were not covered by tests
rocketmq-remoting/src/clients/rocketmq_default_impl.rs (1)
293-295:
Fix address list management and implement shuffling
There are several issues in the name server address list update logic:
- The commented-out shuffling code should be implemented for proper load balancing
- Using
extendinstead of replacing the list could lead to duplicates and memory growth
Apply this fix:
- /* let mut rng = thread_rng();
- addrs.shuffle(&mut rng);*/
- self.namesrv_addr_list.mut_from_ref().extend(addrs.clone());
+ let mut rng = rand::thread_rng();
+ let mut shuffled_addrs = addrs.clone();
+ shuffled_addrs.shuffle(&mut rng);
+ self.namesrv_addr_list.mut_from_ref().clear();
+ self.namesrv_addr_list.mut_from_ref().extend(shuffled_addrs);Let's verify the impact of these changes:
Would you like me to help create unit tests to verify this functionality?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 295-295: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L295
Added line #L295 was not covered by tests
| /*select! { | ||
| _ = self.name_server_runtime.start() =>{ | ||
|
|
||
| } | ||
| } | ||
| }*/ | ||
| tokio::join!(self.name_server_runtime.start(), wait_for_signal()); | ||
| } |
There was a problem hiding this comment.
Add test coverage for the boot method.
The boot method contains critical changes to fix the blocking issue, but lacks test coverage. Please add tests to verify:
- Concurrent execution of server start and signal handling
- Proper shutdown when signal is received
Would you like me to help create a test suite for this functionality?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 63-63: rocketmq-namesrv/src/bootstrap.rs#L63
Added line #L63 was not covered by tests
[warning] 67-68: rocketmq-namesrv/src/bootstrap.rs#L67-L68
Added lines #L67 - L68 were not covered by tests
| tokio::spawn(async move { | ||
| server.run(request_processor).await; | ||
| }); | ||
| let namesrv = CheetahString::from_string(format!( | ||
| "{}:{}", | ||
| NetworkUtil::get_local_address().unwrap(), | ||
| self.server_config.listen_port | ||
| )); | ||
| let weak_arc_mut = ArcMut::downgrade(&self.remoting_client); | ||
| self.remoting_client | ||
| .update_name_server_address_list(vec![namesrv]) | ||
| .await; | ||
| self.remoting_client.start(weak_arc_mut).await; | ||
| info!("Rocketmq NameServer(Rust) started"); |
There was a problem hiding this comment.
Add test coverage for the start method.
The start method contains critical async functionality but lacks test coverage. Please add tests to verify:
- Proper server initialization and startup
- Correct address list update
- Remoting client initialization with weak reference
Would you like me to help create integration tests for this functionality?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 78-91: rocketmq-namesrv/src/bootstrap.rs#L78-L91
Added lines #L78 - L91 were not covered by tests
| let namesrv = CheetahString::from_string(format!( | ||
| "{}:{}", | ||
| NetworkUtil::get_local_address().unwrap(), | ||
| self.server_config.listen_port | ||
| )); |
There was a problem hiding this comment.
Improve error handling for local address resolution.
The unwrap() call on NetworkUtil::get_local_address() could panic if local address resolution fails. Consider proper error handling:
- NetworkUtil::get_local_address().unwrap(),
+ NetworkUtil::get_local_address().map_err(|e| {
+ tracing::error!("Failed to get local address: {}", e);
+ e
+ })?,Committable suggestion skipped: line range outside the PR's diff.
| let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64; | ||
| self.client_runtime.get_handle().spawn(async move { | ||
| loop { | ||
| time::sleep(Duration::from_millis(1)).await; | ||
| client.scan_available_name_srv().await; | ||
| time::sleep(Duration::from_millis(connect_timeout_millis)).await; |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Consider separating scan interval from connection timeout
The sleep duration between name server scans is currently using the connection timeout value. While this is better than the previous 1ms interval, these are conceptually different configurations that should be separated.
Consider:
- Adding a separate configuration for scan interval
- Adding a cancellation mechanism for clean shutdown
async fn start(&self, this: WeakArcMut<Self>) {
if let Some(client) = this.upgrade() {
- let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
+ let scan_interval_millis = self.tokio_client_config.name_srv_scan_interval_millis as u64;
+ let (tx, mut rx) = tokio::sync::oneshot::channel();
+ self.shutdown_tx.replace(tx);
self.client_runtime.get_handle().spawn(async move {
loop {
client.scan_available_name_srv().await;
- time::sleep(Duration::from_millis(connect_timeout_millis)).await;
+ tokio::select! {
+ _ = time::sleep(Duration::from_millis(scan_interval_millis)) => {},
+ _ = &mut rx => break,
+ }
}
});
}
}Additionally, the static analysis indicates these changes lack test coverage. Would you like me to help create unit tests for this functionality?
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 243-243: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L243
Added line #L243 was not covered by tests
[warning] 247-247: rocketmq-remoting/src/clients/rocketmq_default_impl.rs#L247
Added line #L247 was not covered by tests
Which Issue(s) This PR Fixes(Closes)
Fixes #1234
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes
Documentation