feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088
feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088Kangyan-Zhou wants to merge 2 commits intolightseekorg:mainfrom
Conversation
📝 WalkthroughWalkthroughAdds a new Changes
Sequence DiagramsequenceDiagram
participant Client as External Client
participant Gateway as Gateway gRPC Server
participant WMgr as WorkerManager
participant HttpW as HTTP Worker
participant GrpcCli as Model Gateway gRPC Client
participant SGrpc as Sglang gRPC Servicer
participant ReqMgr as RequestManager
participant Scheduler as Scheduler Process
Client->>Gateway: FlushCache(timeout_s)
activate Gateway
Gateway->>WMgr: flush_cache_all()
activate WMgr
par HTTP fan-out
WMgr->>HttpW: POST /flush_cache
HttpW-->>WMgr: HTTP response (success/fail)
and gRPC fan-out
WMgr->>GrpcCli: flush_cache(timeout_s)
activate GrpcCli
GrpcCli->>SGrpc: FlushCache RPC
activate SGrpc
SGrpc->>ReqMgr: send_communicator_req(FlushCacheReqInput)
activate ReqMgr
ReqMgr->>Scheduler: deliver via ZMQ
Scheduler-->>ReqMgr: FlushCacheReqOutput
ReqMgr-->>SGrpc: aggregated responses
SGrpc-->>GrpcCli: FlushCacheResponse(success, message)
GrpcCli-->>WMgr: (success, message)
deactivate GrpcCli
end
WMgr-->>Gateway: aggregated FlushCacheResult
Gateway-->>Client: FlushCacheResult
deactivate Gateway
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces profiling capabilities to the GrpcRequestManager and adds a new ZMQ socket (recv_from_tokenizer) to handle communicator responses directly from the scheduler. It also implements start_profile and stop_profile methods, along with a callback mechanism in serve_grpc to allow external components to interact with the GrpcRequestManager upon initialization. Feedback includes addressing potential compatibility issues with TimeoutError, improving ZMQ error handling consistency, and using UUIDs for profile identifiers to prevent collisions.
| async def _execute_profile(self, req: ProfileReq): | ||
| try: | ||
| results = await self.profile_communicator(req, timeout=600.0) | ||
| except TimeoutError: |
There was a problem hiding this comment.
In Python versions prior to 3.11, TimeoutError is not a built-in exception, and asyncio.wait_for raises asyncio.TimeoutError. Using the bare TimeoutError here will cause a NameError on those Python versions when a timeout occurs. It is safer to use asyncio.TimeoutError for compatibility.
| except TimeoutError: | |
| except asyncio.TimeoutError: |
| except zmq.error.ZMQError as e: | ||
| if self.gracefully_exit: | ||
| break | ||
| logger.error(f"ZMQ error in tokenizer loop: {e}") | ||
| break |
There was a problem hiding this comment.
To maintain consistency with handle_loop and avoid logic duplication, consider refactoring the ZMQ error handling into a shared helper function. The tokenizer loop should handle zmq.error.Again and include a traceback for ZMQError to ensure transient timeouts don't unexpectedly terminate the task and to provide better debugging information.
| except zmq.error.ZMQError as e: | |
| if self.gracefully_exit: | |
| break | |
| logger.error(f"ZMQ error in tokenizer loop: {e}") | |
| break | |
| except zmq.error.Again: | |
| if self.gracefully_exit: | |
| break | |
| continue | |
| except zmq.error.ZMQError as e: | |
| if self.gracefully_exit: | |
| logger.debug(f"ZMQ recv interrupted during shutdown: {e}") | |
| break | |
| logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}") | |
| break |
References
- If a code block is identified as duplicated across multiple functions or modules, consider refactoring to unify the logic into a shared helper function or class. This improves maintainability and reduces the chance of inconsistencies when changes are needed.
There was a problem hiding this comment.
Fixed — added zmq.error.Again handler and logger.debug on graceful shutdown to match handle_loop's pattern exactly.
| with_stack=with_stack, | ||
| record_shapes=record_shapes, | ||
| profile_by_stage=profile_by_stage, | ||
| profile_id=str(time.time()), |
There was a problem hiding this comment.
Using time.time() for a profile ID can lead to collisions if multiple profiles are started in rapid succession and results in a string representation that may be less ideal for filenames (containing dots). Using a UUID is more robust and consistent with how request_id is generated elsewhere in this class.
| profile_id=str(time.time()), | |
| profile_id=f"profile-{uuid.uuid4().hex}", |
cd99331 to
63ae959
Compare
9738eca to
b4034da
Compare
ad792bb to
4754d5a
Compare
Hybrid approach for admin/observability in gRPC mode: gRPC native (FlushCache RPC): - Proto: FlushCacheRequest/FlushCacheResponse messages - Servicer: FlushCache handler with timeout and multi-DP error handling - Rust client: flush_cache method returning (success, message) - Router: flush_cache_all updated to fan out to gRPC workers natively Transport layer (GrpcRequestManager): - recv_from_tokenizer socket + _handle_tokenizer_loop for communicator responses sent via scheduler's send_to_tokenizer path - send_communicator_req: generic thin transport for sglang-side logic - Communicators for profile, flush_cache, get_internal_state - on_request_manager_ready async callback to serve_grpc() Signed-off-by: Kangyan Zhou <zky314343421@gmail.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4754d5a to
18d209c
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 18d209c868
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| "flush_cache_communicator", | ||
| timeout=comm_timeout, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio timeout in FlushCache handler
send_communicator_req() is backed by asyncio.wait_for, which raises asyncio.TimeoutError on Python 3.10, but this handler catches built-in TimeoutError, so timeout cases bypass this branch and get reported by the generic Exception path as INTERNAL instead of DEADLINE_EXCEEDED. This misclassifies real scheduler timeouts and breaks client retry/diagnostic behavior for FlushCache requests under load or partial scheduler stalls.
Useful? React with 👍 / 👎.
| async move { | ||
| let url = worker.url().to_string(); | ||
| match worker.get_grpc_client().await { | ||
| Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await { |
There was a problem hiding this comment.
Bound gRPC flush calls with per-worker timeout
The gRPC fan-out path awaits grpc_client.flush_cache(0.0) without any timeout wrapper, unlike the HTTP branch which enforces REQUEST_TIMEOUT; as a result, one slow or unreachable gRPC worker can stall join_all and keep /flush_cache hanging for an unbounded/very long time. This turns an admin endpoint into a potential blocker during partial outages, so each gRPC flush should be wrapped with a per-worker timeout and treated as a failed worker on expiry.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py`:
- Around line 120-121: The on_request_manager_ready callback is awaited after
creating request_manager and scheduler_procs but before the main shutdown guard,
so any exception leaks child processes/sockets; wrap the await
on_request_manager_ready(request_manager, server_args, scheduler_info) in a
try/except (or move the await inside the existing startup/teardown guard) and on
exception perform clean shutdown steps: call request_manager.shutdown() (or its
stop/close method), stop/bootstrap_server.shutdown() (or the server close
routine), and terminate/join each process in scheduler_procs before re-raising
the exception; ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.
In `@model_gateway/src/routers/grpc/client.rs`:
- Around line 237-246: The match in GrpcClient::flush_cache omits the Self::Mlx
variant causing a non-exhaustive match; add an explicit branch for Self::Mlx in
the flush_cache method (alongside Self::Sglang, Self::Vllm, Self::Trtllm). If
the Mlx client exposes a flush_cache method, call it the same way as in
Self::Sglang (await, unpack resp.success and resp.message, return Ok); if it
does not support flush_cache, return Err(tonic::Status::unimplemented(...)) for
Self::Mlx so the match is exhaustive.
In `@model_gateway/src/worker/manager.rs`:
- Around line 734-753: gRPC fan-out currently awaits
grpc_client.flush_cache(0.0) with join_all, causing unbounded concurrency and
infinite waits; modify the async closure built from grpc_workers (the one that
clones Arc and calls get_grpc_client()) to wrap the flush_cache(...) call with
tokio::time::timeout(REQUEST_TIMEOUT, ...), handle Timeout errors as an Err
result string, and then replace the future::join_all(grpc_futures).await
aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3fa01a34-e8cc-4cf9-9d81-fdf0801ed51f
📒 Files selected for processing (7)
crates/grpc_client/proto/sglang_scheduler.protocrates/grpc_client/src/sglang_scheduler.rsgrpc_servicer/smg_grpc_servicer/sglang/request_manager.pygrpc_servicer/smg_grpc_servicer/sglang/server.pygrpc_servicer/smg_grpc_servicer/sglang/servicer.pymodel_gateway/src/routers/grpc/client.rsmodel_gateway/src/worker/manager.rs
| if on_request_manager_ready is not None: | ||
| await on_request_manager_ready(request_manager, server_args, scheduler_info) |
There was a problem hiding this comment.
Clean up launched resources if the startup hook fails.
Line 120 awaits the hook after the scheduler processes and GrpcRequestManager already exist, but before serve_grpc() reaches its shutdown path. If that callback raises, startup aborts with child processes and ZMQ sockets still alive.
Move this hook under the same startup/teardown guard as the rest of the server bootstrap, or explicitly shut down request_manager, bootstrap_server, and scheduler_procs before re-raising.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py` around lines 120 - 121, The
on_request_manager_ready callback is awaited after creating request_manager and
scheduler_procs but before the main shutdown guard, so any exception leaks child
processes/sockets; wrap the await on_request_manager_ready(request_manager,
server_args, scheduler_info) in a try/except (or move the await inside the
existing startup/teardown guard) and on exception perform clean shutdown steps:
call request_manager.shutdown() (or its stop/close method),
stop/bootstrap_server.shutdown() (or the server close routine), and
terminate/join each process in scheduler_procs before re-raising the exception;
ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.
| pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> { | ||
| match self { | ||
| Self::Sglang(client) => { | ||
| let resp = client.flush_cache(timeout_s).await?; | ||
| Ok((resp.success, resp.message)) | ||
| } | ||
| Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented( | ||
| "flush_cache not supported for this backend", | ||
| )), | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
python - <<'PY'
from pathlib import Path
path = Path("model_gateway/src/routers/grpc/client.rs")
text = path.read_text()
enum_block = text.split("pub enum GrpcClient", 1)[1].split("impl GrpcClient", 1)[0]
method_block = text.split("pub async fn flush_cache", 1)[1].split("/// Fetch tokenizer bundle", 1)[0]
print("enum_has_mlx =", "Mlx(" in enum_block)
print("flush_cache_handles_mlx =", "Self::Mlx" in method_block)
PYRepository: lightseekorg/smg
Length of output: 111
Handle the GrpcClient::Mlx variant in this match.
The match self on line 238 is non-exhaustive. The GrpcClient enum includes the Mlx variant, but this method does not handle it, preventing compilation.
Suggested fix
pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
match self {
Self::Sglang(client) => {
let resp = client.flush_cache(timeout_s).await?;
Ok((resp.success, resp.message))
}
- Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
+ Self::Vllm(_) | Self::Trtllm(_) | Self::Mlx(_) => Err(tonic::Status::unimplemented(
"flush_cache not supported for this backend",
)),
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/routers/grpc/client.rs` around lines 237 - 246, The match
in GrpcClient::flush_cache omits the Self::Mlx variant causing a non-exhaustive
match; add an explicit branch for Self::Mlx in the flush_cache method (alongside
Self::Sglang, Self::Vllm, Self::Trtllm). If the Mlx client exposes a flush_cache
method, call it the same way as in Self::Sglang (await, unpack resp.success and
resp.message, return Ok); if it does not support flush_cache, return
Err(tonic::Status::unimplemented(...)) for Self::Mlx so the match is exhaustive.
| let grpc_futures: Vec<_> = grpc_workers | ||
| .iter() | ||
| .map(|w| { | ||
| let worker = Arc::clone(w); | ||
| async move { | ||
| let url = worker.url().to_string(); | ||
| match worker.get_grpc_client().await { | ||
| Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await { | ||
| Ok((true, _)) => (url, Ok(())), | ||
| Ok((false, msg)) => (url, Err(format!("flush failed: {msg}"))), | ||
| Err(e) => (url, Err(format!("gRPC error: {e}"))), | ||
| }, | ||
| Ok(None) => (url, Err("no gRPC client available".to_string())), | ||
| Err(e) => (url, Err(format!("failed to get gRPC client: {e}"))), | ||
| } | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| let grpc_results: Vec<_> = future::join_all(grpc_futures).await; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "HTTP fan-out path:"
sed -n '52,84p' model_gateway/src/worker/manager.rs
echo
echo "gRPC flush path:"
sed -n '733,759p' model_gateway/src/worker/manager.rsRepository: lightseekorg/smg
Length of output: 2254
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Get full flush_cache_all function
echo "=== Full flush_cache_all function ==="
sed -n '691,780p' model_gateway/src/worker/manager.rs
echo
echo "=== REQUEST_TIMEOUT and MAX_CONCURRENT definitions ==="
rg 'REQUEST_TIMEOUT|MAX_CONCURRENT' model_gateway/src/worker/manager.rs -B 2 -A 2Repository: lightseekorg/smg
Length of output: 4039
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check if flush_cache_all has a caller-level timeout wrapper
echo "=== Calls to flush_cache_all ==="
rg 'flush_cache_all' model_gateway/src --type rust -B 3 -A 3
echo
echo "=== gRPC client builder for timeouts ==="
fd -t f 'grpc_client' crates/grpc_client/src | head -5 | xargs -I {} sh -c 'echo "=== {} ==="; head -100 {}'Repository: lightseekorg/smg
Length of output: 1580
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Searching for flush_cache method definition in gRPC clients ==="
fd -t f '\.rs$' crates/grpc_client/src | xargs grep -l 'flush_cache' | head -3
echo
echo "=== flush_cache method signatures ==="
rg 'fn flush_cache' crates/grpc_client/src -A 5 | head -40
echo
echo "=== gRPC timeout patterns in model_gateway ==="
rg 'timeout|Timeout|Duration' model_gateway/src/routers/grpc --type rust -c | sort | uniq -c | head -10Repository: lightseekorg/smg
Length of output: 892
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check for gRPC request timeout patterns in model_gateway ==="
rg 'tokio::time::timeout|timeout\(' model_gateway/src/routers/grpc --type rust -B 2 -A 2 | head -60Repository: lightseekorg/smg
Length of output: 568
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== All flush_cache calls in the codebase ==="
rg 'flush_cache\(' model_gateway/src --type rust -B 2 -A 2
echo
echo "=== Check flush_cache implementation for timeout_s usage ==="
sed -n '/pub async fn flush_cache/,/^ }/p' crates/grpc_client/src/sglang_scheduler.rs | head -40Repository: lightseekorg/smg
Length of output: 2129
🏁 Script executed:
#!/bin/bash
set -euo pipeftp
echo "=== Check if timeout behavior is documented or if 0.0 is intentional ==="
git log --all --oneline --grep="flush_cache" -- model_gateway/src/worker/manager.rs | head -5
git log --all -p -- crates/grpc_client/src/sglang_scheduler.rs | grep -A 5 -B 5 'flush_cache' | head -40Repository: lightseekorg/smg
Length of output: 112
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check if there's any wrapper timeout around flush_cache_all calls ==="
sed -n '1,/^async fn flush_cache/p' model_gateway/src/server.rs | tail -20
echo
echo "=== Check other RPC calls in grpc/client.rs for timeout patterns ==="
rg 'tokio::time::timeout' model_gateway/src/routers/grpc/client.rs -A 3 -B 1 | head -40Repository: lightseekorg/smg
Length of output: 811
Apply REQUEST_TIMEOUT and concurrency bounding to the gRPC fan-out, matching the HTTP path.
The HTTP fan-out path enforces .timeout(REQUEST_TIMEOUT) (5 seconds) and .buffer_unordered(MAX_CONCURRENT) (32 workers), but the gRPC path calls flush_cache(0.0) (infinite timeout) and uses join_all() (unbounded concurrency). A single stalled gRPC worker can hang the entire cache-flush operation indefinitely; firing all gRPC flushes at once risks overwhelming the fleet on deployments with many workers.
Code comparison
HTTP path (lines 69, 81):
.timeout(REQUEST_TIMEOUT)
.buffer_unordered(MAX_CONCURRENT)
gRPC path (lines 740, 753):
grpc_client.flush_cache(0.0).await
future::join_all(grpc_futures).await
Wrap each flush_cache() call with tokio::time::timeout(REQUEST_TIMEOUT, ...) and replace join_all() with stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect() to match the HTTP implementation.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@model_gateway/src/worker/manager.rs` around lines 734 - 753, gRPC fan-out
currently awaits grpc_client.flush_cache(0.0) with join_all, causing unbounded
concurrency and infinite waits; modify the async closure built from grpc_workers
(the one that clones Arc and calls get_grpc_client()) to wrap the
flush_cache(...) call with tokio::time::timeout(REQUEST_TIMEOUT, ...), handle
Timeout errors as an Err result string, and then replace the
future::join_all(grpc_futures).await aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/grpc_client/src/sglang_scheduler.rs`:
- Around line 304-310: The flush_cache method currently forwards timeout_s to
the server but does not enforce a local deadline, so the .await on
client.flush_cache(request) can hang; update the method in sglang_scheduler.rs
(the function that builds proto::FlushCacheRequest and calls
client.flush_cache(request)) to validate timeout_s (treat <= 0.0 as "no local
timeout") and, when timeout_s > 0.0, call the RPC via
tokio::time::timeout(Duration::from_secs_f32(timeout_s),
client.flush_cache(request)).await, mapping a tokio timeout error to a
tonic::Status::deadline_exceeded; if timeout_s <= 0.0 just await the RPC
normally. Ensure you still clone self.client as before and propagate other RPC
errors unchanged.
In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py`:
- Around line 965-982: The send_communicator_req method should validate
communicator_name before using getattr to avoid cryptic TypeError: add an
explicit registry (e.g., a set or dict like communicator_registry or
_allowed_communicators on the same class) listing valid communicator attribute
names, then inside send_communicator_req call auto_create_handle_loop(), check
that communicator_name is in that registry, retrieve the attribute with
getattr(self, communicator_name), verify it is callable, and if not present or
not callable raise a clear ValueError mentioning the invalid communicator_name
and listing allowed names; update any callers if needed and keep the rest of the
method (await communicator(req, timeout=timeout)) unchanged.
- Around line 546-579: Remove the dead branch that handles GetLoadsReqOutput in
the handle_loop function so only _handle_tokenizer_loop processes scheduler-sent
communicator responses; locate the GetLoadsReqOutput conditional inside
handle_loop and delete that elif branch (and any now-unused local references)
leaving GetLoadsReqOutput handling only in _handle_tokenizer_loop, ensuring no
other logic depends on the removed branch.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9c187e9b-e3e3-4ef4-9e5b-6ff3b5743d46
📒 Files selected for processing (2)
crates/grpc_client/src/sglang_scheduler.rsgrpc_servicer/smg_grpc_servicer/sglang/request_manager.py
| timeout_s: f32, | ||
| ) -> Result<proto::FlushCacheResponse, tonic::Status> { | ||
| debug!("Requesting cache flush (timeout_s={timeout_s})"); | ||
| let request = Request::new(proto::FlushCacheRequest { timeout_s }); | ||
|
|
||
| let mut client = self.client.clone(); | ||
| let response = client.flush_cache(request).await?; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Inspect flush_cache call sites and existing timeout/deadline patterns.
set -euo pipefail
echo "Flush cache call sites:"
rg -n -C3 --type rust '\bflush_cache\s*\('
echo
echo "Existing timeout/deadline patterns:"
rg -n -C3 --type rust 'tokio::time::timeout|Request::set_timeout|REQUEST_TIMEOUT|deadline_exceeded'Repository: lightseekorg/smg
Length of output: 30490
🏁 Script executed:
#!/bin/bash
# Check all public async methods in sglang_scheduler.rs that make RPC calls
ast-grep --pattern $'impl SglangSchedulerClient {
$$$
pub async fn $_($$$) -> $_ {
$$$
}
$$$
}'Repository: lightseekorg/smg
Length of output: 50372
Enforce a client-side deadline for the flush RPC.
timeout_s is only forwarded to the scheduler; the gRPC await itself can block indefinitely if the scheduler or transport never responds. Since WorkerManager::flush_cache_all awaits grpc_client.flush_cache(0.0), this can stall the admin flush path. Validate the timeout and wrap the RPC with a local deadline.
Proposed fix
pub async fn flush_cache(
&self,
timeout_s: f32,
) -> Result<proto::FlushCacheResponse, tonic::Status> {
+ let scheduler_wait_timeout = Duration::try_from_secs_f32(timeout_s).map_err(|_| {
+ tonic::Status::invalid_argument("timeout_s must be finite and non-negative")
+ })?;
+ let rpc_timeout = scheduler_wait_timeout.saturating_add(Duration::from_secs(5));
+
debug!("Requesting cache flush (timeout_s={timeout_s})");
let request = Request::new(proto::FlushCacheRequest { timeout_s });
let mut client = self.client.clone();
- let response = client.flush_cache(request).await?;
+ let response = tokio::time::timeout(rpc_timeout, client.flush_cache(request))
+ .await
+ .map_err(|_| tonic::Status::deadline_exceeded("flush_cache RPC timed out"))??;
debug!("Flush cache response received");
Ok(response.into_inner())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| timeout_s: f32, | |
| ) -> Result<proto::FlushCacheResponse, tonic::Status> { | |
| debug!("Requesting cache flush (timeout_s={timeout_s})"); | |
| let request = Request::new(proto::FlushCacheRequest { timeout_s }); | |
| let mut client = self.client.clone(); | |
| let response = client.flush_cache(request).await?; | |
| timeout_s: f32, | |
| ) -> Result<proto::FlushCacheResponse, tonic::Status> { | |
| let scheduler_wait_timeout = Duration::try_from_secs_f32(timeout_s).map_err(|_| { | |
| tonic::Status::invalid_argument("timeout_s must be finite and non-negative") | |
| })?; | |
| let rpc_timeout = scheduler_wait_timeout.saturating_add(Duration::from_secs(5)); | |
| debug!("Requesting cache flush (timeout_s={timeout_s})"); | |
| let request = Request::new(proto::FlushCacheRequest { timeout_s }); | |
| let mut client = self.client.clone(); | |
| let response = tokio::time::timeout(rpc_timeout, client.flush_cache(request)) | |
| .await | |
| .map_err(|_| tonic::Status::deadline_exceeded("flush_cache RPC timed out"))??; | |
| debug!("Flush cache response received"); | |
| Ok(response.into_inner()) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/grpc_client/src/sglang_scheduler.rs` around lines 304 - 310, The
flush_cache method currently forwards timeout_s to the server but does not
enforce a local deadline, so the .await on client.flush_cache(request) can hang;
update the method in sglang_scheduler.rs (the function that builds
proto::FlushCacheRequest and calls client.flush_cache(request)) to validate
timeout_s (treat <= 0.0 as "no local timeout") and, when timeout_s > 0.0, call
the RPC via tokio::time::timeout(Duration::from_secs_f32(timeout_s),
client.flush_cache(request)).await, mapping a tokio timeout error to a
tonic::Status::deadline_exceeded; if timeout_s <= 0.0 just await the RPC
normally. Ensure you still clone self.client as before and propagate other RPC
errors unchanged.
| async def _handle_tokenizer_loop(self): | ||
| """Process communicator responses from the scheduler's send_to_tokenizer socket. | ||
|
|
||
| The scheduler sends ProfileReqOutput (and similar communicator responses) | ||
| directly to tokenizer_ipc_name, not through the detokenizer. This loop | ||
| receives those responses and dispatches them to the appropriate communicator. | ||
| """ | ||
| while not self.gracefully_exit: | ||
| try: | ||
| recv_obj = await self.recv_from_tokenizer.recv_pyobj() | ||
| if isinstance(recv_obj, ProfileReqOutput): | ||
| self.profile_communicator.handle_recv(recv_obj) | ||
| elif isinstance(recv_obj, FlushCacheReqOutput): | ||
| self.flush_cache_communicator.handle_recv(recv_obj) | ||
| elif isinstance(recv_obj, GetInternalStateReqOutput): | ||
| self.get_internal_state_communicator.handle_recv(recv_obj) | ||
| elif isinstance(recv_obj, GetLoadsReqOutput): | ||
| self.get_loads_communicator.handle_recv(recv_obj) | ||
| else: | ||
| logger.warning(f"Unknown type on tokenizer socket: {type(recv_obj)}") | ||
| except zmq.error.Again: | ||
| if self.gracefully_exit: | ||
| break | ||
| continue | ||
| except zmq.error.ZMQError as e: | ||
| if self.gracefully_exit: | ||
| logger.debug(f"ZMQ recv interrupted during shutdown: {e}") | ||
| break | ||
| logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}") | ||
| break | ||
| except Exception as e: | ||
| logger.error(f"Tokenizer loop error: {e}\n{get_exception_traceback()}") | ||
| if self.gracefully_exit: | ||
| break |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Locate where sglang scheduler sends GetLoadsReqOutput (tokenizer vs detokenizer IPC).
python -c "import sglang, os; print(os.path.dirname(sglang.__file__))" 2>/dev/null || true
# Search installed sglang (if present) and the repo for send sites.
rg -nP -C3 '\bGetLoadsReqOutput\s*\(' --glob '!**/request_manager.py'
rg -nP -C3 'send_to_tokenizer.*GetLoads|send_to_detokenizer.*GetLoads'Repository: lightseekorg/smg
Length of output: 42
🏁 Script executed:
# First, locate the request_manager.py file
fd -n "request_manager.py" | head -20Repository: lightseekorg/smg
Length of output: 290
🏁 Script executed:
# Check the structure and confirm file exists
git ls-files | grep -E "request_manager\.py|sglang.*request_manager"Repository: lightseekorg/smg
Length of output: 117
🏁 Script executed:
# Read the specific sections mentioned in the review
if [ -f "grpc_servicer/smg_grpc_servicer/sglang/request_manager.py" ]; then
echo "=== Lines 520-530 (handle_loop GetLoadsReqOutput) ==="
sed -n '520,530p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
echo ""
echo "=== Lines 560-570 (_handle_tokenizer_loop GetLoadsReqOutput) ==="
sed -n '560,570p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
fiRepository: lightseekorg/smg
Length of output: 1306
🏁 Script executed:
# Search for all GetLoadsReqOutput occurrences and routing
rg -n "GetLoadsReqOutput" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py -B2 -A2Repository: lightseekorg/smg
Length of output: 1708
🏁 Script executed:
# Check for any comments or documentation about tokenizer vs detokenizer routing
rg -n "tokenizer.*[Gg]et[Ll]oads|detokenizer.*[Gg]et[Ll]oads" grpc_servicer/smg_grpc_servicer/sglang/ -iRepository: lightseekorg/smg
Length of output: 42
🏁 Script executed:
# Check where handle_loop and _handle_tokenizer_loop are started
rg -n "handle_loop|_handle_tokenizer_loop" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py | grep -E "(asyncio\.create_task|gather|Task|start)"Repository: lightseekorg/smg
Length of output: 165
🏁 Script executed:
# Check the full auto_create_handle_loop method to see both loops started
sed -n '990,1020p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.pyRepository: lightseekorg/smg
Length of output: 1476
🏁 Script executed:
# Search for any comments or history about GetLoadsReqOutput and which path it should use
rg -n "GetLoads" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py -B3 -A3 | head -80Repository: lightseekorg/smg
Length of output: 2441
🏁 Script executed:
# Check what sockets are being used and their purposes
rg -n "recv_from_detokenizer|recv_from_tokenizer" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py | head -20Repository: lightseekorg/smg
Length of output: 234
Remove dead GetLoadsReqOutput handling from handle_loop — scheduler sends via tokenizer socket only.
Both handle_loop (line 524–525) and _handle_tokenizer_loop (line 562–563) handle GetLoadsReqOutput, but the comment at lines 195–196 explicitly states the scheduler sends communicator responses (including GetLoadsReqOutput) directly via send_to_tokenizer, which is the new path used by _handle_tokenizer_loop. The handle_loop branch is dead code and should be removed to avoid confusion and reduce maintenance burden. Keep only the _handle_tokenizer_loop branch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py` around lines 546 -
579, Remove the dead branch that handles GetLoadsReqOutput in the handle_loop
function so only _handle_tokenizer_loop processes scheduler-sent communicator
responses; locate the GetLoadsReqOutput conditional inside handle_loop and
delete that elif branch (and any now-unused local references) leaving
GetLoadsReqOutput handling only in _handle_tokenizer_loop, ensuring no other
logic depends on the removed branch.
| async def send_communicator_req(self, req, communicator_name: str, timeout: float = 30.0): | ||
| """Send a request to the scheduler via a named communicator and return responses. | ||
|
|
||
| This is the generic transport method for request/response patterns that | ||
| go through the tokenizer IPC path (profile, flush_cache, get_internal_state, etc.). | ||
| Business logic (request construction, response interpretation) belongs in the caller. | ||
|
|
||
| Args: | ||
| req: The request object to send to the scheduler. | ||
| communicator_name: Attribute name of the communicator (e.g., "profile_communicator"). | ||
| timeout: Timeout in seconds for the ZMQ round-trip. | ||
|
|
||
| Returns: | ||
| List of response objects from scheduler(s), one per DP rank. | ||
| """ | ||
| self.auto_create_handle_loop() | ||
| communicator = getattr(self, communicator_name) | ||
| return await communicator(req, timeout=timeout) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Optional: validate communicator_name via an explicit registry.
getattr(self, communicator_name) accepts any attribute on self and silently resolves to non-communicator objects (e.g., "send_to_scheduler"), failing later with a cryptic TypeError when invoked. A small registry makes the allowed set explicit and yields a clear error on misuse.
♻️ Proposed refactor
async def send_communicator_req(self, req, communicator_name: str, timeout: float = 30.0):
...
self.auto_create_handle_loop()
- communicator = getattr(self, communicator_name)
- return await communicator(req, timeout=timeout)
+ registry = {
+ "profile_communicator": self.profile_communicator,
+ "flush_cache_communicator": self.flush_cache_communicator,
+ "get_internal_state_communicator": self.get_internal_state_communicator,
+ "get_loads_communicator": self.get_loads_communicator,
+ }
+ try:
+ communicator = registry[communicator_name]
+ except KeyError as e:
+ raise ValueError(f"Unknown communicator: {communicator_name!r}") from e
+ return await communicator(req, timeout=timeout)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py` around lines 965 -
982, The send_communicator_req method should validate communicator_name before
using getattr to avoid cryptic TypeError: add an explicit registry (e.g., a set
or dict like communicator_registry or _allowed_communicators on the same class)
listing valid communicator attribute names, then inside send_communicator_req
call auto_create_handle_loop(), check that communicator_name is in that
registry, retrieve the attribute with getattr(self, communicator_name), verify
it is callable, and if not present or not callable raise a clear ValueError
mentioning the invalid communicator_name and listing allowed names; update any
callers if needed and keep the rest of the method (await communicator(req,
timeout=timeout)) unchanged.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 633912461e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| "flush_cache_communicator", | ||
| timeout=comm_timeout, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio timeout from FlushCache communicator call
send_communicator_req() waits on asyncio.wait_for, which raises asyncio.TimeoutError in this Python 3.10 environment, but this handler only catches built-in TimeoutError. When a scheduler flush response times out, it falls into the generic Exception branch and is reported as INTERNAL instead of DEADLINE_EXCEEDED, which misclassifies real timeout conditions and breaks client retry/diagnostic behavior during scheduler stalls.
Useful? React with 👍 / 👎.
| async move { | ||
| let url = worker.url().to_string(); | ||
| match worker.get_grpc_client().await { | ||
| Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await { |
There was a problem hiding this comment.
Bound each gRPC flush fan-out call with a timeout
The gRPC branch fans out grpc_client.flush_cache(0.0) and awaits join_all without any per-worker timeout, unlike the HTTP branch that enforces REQUEST_TIMEOUT. If a single gRPC worker becomes slow or unreachable, /flush_cache can block indefinitely on that future and stall the whole admin endpoint during partial outages; wrap each gRPC flush call with tokio::time::timeout and record timeout as a failed worker.
Useful? React with 👍 / 👎.
|
This pull request has been automatically marked as stale because it has not had any activity within 14 days. It will be automatically closed if no further activity occurs within 16 days. Leave a comment if you feel this pull request should remain open. Thank you! |
Summary
Adds admin/observability support for gRPC-mode engines using a hybrid approach:
gRPC native: FlushCache RPC
FlushCacheRPC withFlushCacheRequest/FlushCacheResponsemessagesFlushCachehandler usingsend_communicator_reqwith timeout handling and multi-DP-rank error aggregationflush_cachemethod toSglangSchedulerClientflush_cachedispatcher returning(success, message)tupleflush_cache_allto handle gRPC workers natively (no HTTP sidecar needed)Transport layer: GrpcRequestManager
recv_from_tokenizersocket to receive communicator responses (profile, flush_cache, get_internal_state) sent via the scheduler'ssend_to_tokenizerpath_handle_tokenizer_loopwith properzmq.error.Againhandlingsend_communicator_req()— generic thin transport method for sglang-side business logicon_request_manager_readyasync callback toserve_grpc()passing(request_manager, server_args, scheduler_info)Motivation
gRPC mode previously had no way to flush the KV cache, trigger profiler traces, or query server info. The router's
/flush_cachesilently no-oped for gRPC workers (filtered to HTTP-only). This blockedbench_serving.pyand benchmarking workflows against gRPC deployments.Companion PR
sgl-project/sglang#22500
Architecture
/flush_cache/start_profile,/stop_profile/server_info/metricsTest plan
cargo checkfor smg-grpc-client and smg)/flush_cachethrough router 200 (confirmed gRPC path:total_http_workers: 0, workers_flushed: 1)🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes / Reliability