Skip to content

feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088

Open
Kangyan-Zhou wants to merge 2 commits intolightseekorg:mainfrom
Kangyan-Zhou:profile_endpoint
Open

feat(grpc): add FlushCache RPC and profiling support for gRPC mode#1088
Kangyan-Zhou wants to merge 2 commits intolightseekorg:mainfrom
Kangyan-Zhou:profile_endpoint

Conversation

@Kangyan-Zhou
Copy link
Copy Markdown
Contributor

@Kangyan-Zhou Kangyan-Zhou commented Apr 10, 2026

Summary

Adds admin/observability support for gRPC-mode engines using a hybrid approach:

gRPC native: FlushCache RPC

  • Proto: Added FlushCache RPC with FlushCacheRequest/FlushCacheResponse messages
  • Servicer: Implemented FlushCache handler using send_communicator_req with timeout handling and multi-DP-rank error aggregation
  • Rust client: Added flush_cache method to SglangSchedulerClient
  • Router client: Added flush_cache dispatcher returning (success, message) tuple
  • Router manager: Updated flush_cache_all to handle gRPC workers natively (no HTTP sidecar needed)

Transport layer: GrpcRequestManager

  • Added recv_from_tokenizer socket to receive communicator responses (profile, flush_cache, get_internal_state) sent via the scheduler's send_to_tokenizer path
  • Added _handle_tokenizer_loop with proper zmq.error.Again handling
  • Added send_communicator_req() — generic thin transport method for sglang-side business logic
  • Added communicators for profile, flush_cache, and get_internal_state
  • Added on_request_manager_ready async callback to serve_grpc() passing (request_manager, server_args, scheduler_info)

Motivation

gRPC mode previously had no way to flush the KV cache, trigger profiler traces, or query server info. The router's /flush_cache silently no-oped for gRPC workers (filtered to HTTP-only). This blocked bench_serving.py and benchmarking workflows against gRPC deployments.

Companion PR

sgl-project/sglang#22500

Architecture

Endpoint Transport Why
/flush_cache gRPC RPC (via router) Should fan out to all workers; router already has the HTTP route
/start_profile, /stop_profile HTTP sidecar (direct engine) PD mode needs per-worker targeting, not fan-out
/server_info HTTP sidecar (direct engine) Engine-level info, not router-level
/metrics HTTP sidecar (Prometheus scraping) Standard pattern

Test plan

  • Rust builds clean (cargo check for smg-grpc-client and smg)
  • E2E: Single engine — FlushCache gRPC direct, sidecar endpoints all work
  • E2E: Engine + Router — generation through router 200, /flush_cache through router 200 (confirmed gRPC path: total_http_workers: 0, workers_flushed: 1)
  • CI tests

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added a cache-flush operation to the scheduler gRPC API with a configurable timeout option.
    • Exposed cache-flush from the gateway so both HTTP and gRPC workers can be targeted for flushes.
    • Flush responses now return overall success plus a human-readable message aggregating per-target outcomes.
  • Bug Fixes / Reliability

    • Improved timeout and error handling for cache-flush requests.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 10, 2026

📝 Walkthrough

Walkthrough

Adds a new FlushCache scheduler RPC and end-to-end support: protobuf, gRPC client wrappers, servicer handler, request-manager routing, server init hook, and WorkerManager integration to flush caches across HTTP and gRPC workers.

Changes

Cohort / File(s) Summary
Protocol Definition
crates/grpc_client/proto/sglang_scheduler.proto
Added FlushCache RPC plus FlushCacheRequest { float timeout_s } and FlushCacheResponse { bool success, string message }.
gRPC Client Wrappers
crates/grpc_client/src/sglang_scheduler.rs, model_gateway/src/routers/grpc/client.rs
Added flush_cache(timeout_s) async methods; gateway maps Sglang responses to (success, message) and returns unimplemented for non-Sglang backends.
Request Management & Routing
grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
Added second ZMQ PULL socket (recv_from_tokenizer), new _handle_tokenizer_loop, new communicators for flush/profile/internal-state, and public send_communicator_req(...) helper to route flush requests/responses.
Server Initialization Hook
grpc_servicer/smg_grpc_servicer/sglang/server.py
Extended serve_grpc() signature to accept on_request_manager_ready and invoke it after request manager creation and before server startup.
RPC Handler
grpc_servicer/smg_grpc_servicer/sglang/servicer.py
Added FlushCache RPC handler that sends FlushCacheReqInput via request_manager.send_communicator_req(...), handles timeouts/exceptions mapping to gRPC status codes, validates/aggregates per-target results, and returns FlushCacheResponse.
Worker Flush Integration
model_gateway/src/worker/manager.rs
Extended WorkerManager::flush_cache_all to include gRPC workers: split registry by connection mode, perform async fan-out to gRPC clients calling grpc_client.flush_cache, and aggregate successes/failures alongside existing HTTP fan-out.

Sequence Diagram

sequenceDiagram
    participant Client as External Client
    participant Gateway as Gateway gRPC Server
    participant WMgr as WorkerManager
    participant HttpW as HTTP Worker
    participant GrpcCli as Model Gateway gRPC Client
    participant SGrpc as Sglang gRPC Servicer
    participant ReqMgr as RequestManager
    participant Scheduler as Scheduler Process

    Client->>Gateway: FlushCache(timeout_s)
    activate Gateway

    Gateway->>WMgr: flush_cache_all()
    activate WMgr

    par HTTP fan-out
        WMgr->>HttpW: POST /flush_cache
        HttpW-->>WMgr: HTTP response (success/fail)
    and gRPC fan-out
        WMgr->>GrpcCli: flush_cache(timeout_s)
        activate GrpcCli
        GrpcCli->>SGrpc: FlushCache RPC
        activate SGrpc
        SGrpc->>ReqMgr: send_communicator_req(FlushCacheReqInput)
        activate ReqMgr
        ReqMgr->>Scheduler: deliver via ZMQ
        Scheduler-->>ReqMgr: FlushCacheReqOutput
        ReqMgr-->>SGrpc: aggregated responses
        SGrpc-->>GrpcCli: FlushCacheResponse(success, message)
        GrpcCli-->>WMgr: (success, message)
        deactivate GrpcCli
    end

    WMgr-->>Gateway: aggregated FlushCacheResult
    Gateway-->>Client: FlushCacheResult
    deactivate Gateway
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐇 I hopped from proto to server with care,
Sent flushes through sockets and threads in the air.
From gateway to workers, both HTTP and gRPC,
We cleared out the crumbs and set caches free.
A tiny rabbit clap—now the system feels fair. 🥕✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately describes the primary changes: adding a FlushCache RPC and supporting infrastructure for gRPC mode, which aligns with the main file modifications across proto definitions, Rust client/router, and Python servicer layers.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added the grpc gRPC client and router changes label Apr 10, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces profiling capabilities to the GrpcRequestManager and adds a new ZMQ socket (recv_from_tokenizer) to handle communicator responses directly from the scheduler. It also implements start_profile and stop_profile methods, along with a callback mechanism in serve_grpc to allow external components to interact with the GrpcRequestManager upon initialization. Feedback includes addressing potential compatibility issues with TimeoutError, improving ZMQ error handling consistency, and using UUIDs for profile identifiers to prevent collisions.

async def _execute_profile(self, req: ProfileReq):
try:
results = await self.profile_communicator(req, timeout=600.0)
except TimeoutError:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In Python versions prior to 3.11, TimeoutError is not a built-in exception, and asyncio.wait_for raises asyncio.TimeoutError. Using the bare TimeoutError here will cause a NameError on those Python versions when a timeout occurs. It is safer to use asyncio.TimeoutError for compatibility.

Suggested change
except TimeoutError:
except asyncio.TimeoutError:

Comment on lines +560 to +564
except zmq.error.ZMQError as e:
if self.gracefully_exit:
break
logger.error(f"ZMQ error in tokenizer loop: {e}")
break
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To maintain consistency with handle_loop and avoid logic duplication, consider refactoring the ZMQ error handling into a shared helper function. The tokenizer loop should handle zmq.error.Again and include a traceback for ZMQError to ensure transient timeouts don't unexpectedly terminate the task and to provide better debugging information.

Suggested change
except zmq.error.ZMQError as e:
if self.gracefully_exit:
break
logger.error(f"ZMQ error in tokenizer loop: {e}")
break
except zmq.error.Again:
if self.gracefully_exit:
break
continue
except zmq.error.ZMQError as e:
if self.gracefully_exit:
logger.debug(f"ZMQ recv interrupted during shutdown: {e}")
break
logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}")
break
References
  1. If a code block is identified as duplicated across multiple functions or modules, consider refactoring to unify the logic into a shared helper function or class. This improves maintainability and reduces the chance of inconsistencies when changes are needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added zmq.error.Again handler and logger.debug on graceful shutdown to match handle_loop's pattern exactly.

with_stack=with_stack,
record_shapes=record_shapes,
profile_by_stage=profile_by_stage,
profile_id=str(time.time()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.time() for a profile ID can lead to collisions if multiple profiles are started in rapid succession and results in a string representation that may be less ideal for filenames (containing dots). Using a UUID is more robust and consistent with how request_id is generated elsewhere in this class.

Suggested change
profile_id=str(time.time()),
profile_id=f"profile-{uuid.uuid4().hex}",

@Kangyan-Zhou Kangyan-Zhou force-pushed the profile_endpoint branch 2 times, most recently from 9738eca to b4034da Compare April 10, 2026 19:37
@github-actions github-actions Bot added the model-gateway Model gateway crate changes label Apr 10, 2026
@Kangyan-Zhou Kangyan-Zhou changed the title feat(grpc-servicer): add profiling support to GrpcRequestManager feat(grpc): add FlushCache RPC and profiling support for gRPC mode Apr 10, 2026
@Kangyan-Zhou Kangyan-Zhou force-pushed the profile_endpoint branch 2 times, most recently from ad792bb to 4754d5a Compare April 10, 2026 19:52
Hybrid approach for admin/observability in gRPC mode:

gRPC native (FlushCache RPC):
- Proto: FlushCacheRequest/FlushCacheResponse messages
- Servicer: FlushCache handler with timeout and multi-DP error handling
- Rust client: flush_cache method returning (success, message)
- Router: flush_cache_all updated to fan out to gRPC workers natively

Transport layer (GrpcRequestManager):
- recv_from_tokenizer socket + _handle_tokenizer_loop for communicator
  responses sent via scheduler's send_to_tokenizer path
- send_communicator_req: generic thin transport for sglang-side logic
- Communicators for profile, flush_cache, get_internal_state
- on_request_manager_ready async callback to serve_grpc()

Signed-off-by: Kangyan Zhou <zky314343421@gmail.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Kangyan-Zhou Kangyan-Zhou marked this pull request as ready for review April 15, 2026 03:50
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 18d209c868

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

"flush_cache_communicator",
timeout=comm_timeout,
)
except TimeoutError:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Catch asyncio timeout in FlushCache handler

send_communicator_req() is backed by asyncio.wait_for, which raises asyncio.TimeoutError on Python 3.10, but this handler catches built-in TimeoutError, so timeout cases bypass this branch and get reported by the generic Exception path as INTERNAL instead of DEADLINE_EXCEEDED. This misclassifies real scheduler timeouts and breaks client retry/diagnostic behavior for FlushCache requests under load or partial scheduler stalls.

Useful? React with 👍 / 👎.

async move {
let url = worker.url().to_string();
match worker.get_grpc_client().await {
Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound gRPC flush calls with per-worker timeout

The gRPC fan-out path awaits grpc_client.flush_cache(0.0) without any timeout wrapper, unlike the HTTP branch which enforces REQUEST_TIMEOUT; as a result, one slow or unreachable gRPC worker can stall join_all and keep /flush_cache hanging for an unbounded/very long time. This turns an admin endpoint into a potential blocker during partial outages, so each gRPC flush should be wrapped with a per-worker timeout and treated as a failed worker on expiry.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@grpc_servicer/smg_grpc_servicer/sglang/server.py`:
- Around line 120-121: The on_request_manager_ready callback is awaited after
creating request_manager and scheduler_procs but before the main shutdown guard,
so any exception leaks child processes/sockets; wrap the await
on_request_manager_ready(request_manager, server_args, scheduler_info) in a
try/except (or move the await inside the existing startup/teardown guard) and on
exception perform clean shutdown steps: call request_manager.shutdown() (or its
stop/close method), stop/bootstrap_server.shutdown() (or the server close
routine), and terminate/join each process in scheduler_procs before re-raising
the exception; ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.

In `@model_gateway/src/routers/grpc/client.rs`:
- Around line 237-246: The match in GrpcClient::flush_cache omits the Self::Mlx
variant causing a non-exhaustive match; add an explicit branch for Self::Mlx in
the flush_cache method (alongside Self::Sglang, Self::Vllm, Self::Trtllm). If
the Mlx client exposes a flush_cache method, call it the same way as in
Self::Sglang (await, unpack resp.success and resp.message, return Ok); if it
does not support flush_cache, return Err(tonic::Status::unimplemented(...)) for
Self::Mlx so the match is exhaustive.

In `@model_gateway/src/worker/manager.rs`:
- Around line 734-753: gRPC fan-out currently awaits
grpc_client.flush_cache(0.0) with join_all, causing unbounded concurrency and
infinite waits; modify the async closure built from grpc_workers (the one that
clones Arc and calls get_grpc_client()) to wrap the flush_cache(...) call with
tokio::time::timeout(REQUEST_TIMEOUT, ...), handle Timeout errors as an Err
result string, and then replace the future::join_all(grpc_futures).await
aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3fa01a34-e8cc-4cf9-9d81-fdf0801ed51f

📥 Commits

Reviewing files that changed from the base of the PR and between f9cffd7 and 18d209c.

📒 Files selected for processing (7)
  • crates/grpc_client/proto/sglang_scheduler.proto
  • crates/grpc_client/src/sglang_scheduler.rs
  • grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
  • grpc_servicer/smg_grpc_servicer/sglang/server.py
  • grpc_servicer/smg_grpc_servicer/sglang/servicer.py
  • model_gateway/src/routers/grpc/client.rs
  • model_gateway/src/worker/manager.rs

Comment on lines +120 to +121
if on_request_manager_ready is not None:
await on_request_manager_ready(request_manager, server_args, scheduler_info)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Clean up launched resources if the startup hook fails.

Line 120 awaits the hook after the scheduler processes and GrpcRequestManager already exist, but before serve_grpc() reaches its shutdown path. If that callback raises, startup aborts with child processes and ZMQ sockets still alive.

Move this hook under the same startup/teardown guard as the rest of the server bootstrap, or explicitly shut down request_manager, bootstrap_server, and scheduler_procs before re-raising.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@grpc_servicer/smg_grpc_servicer/sglang/server.py` around lines 120 - 121, The
on_request_manager_ready callback is awaited after creating request_manager and
scheduler_procs but before the main shutdown guard, so any exception leaks child
processes/sockets; wrap the await on_request_manager_ready(request_manager,
server_args, scheduler_info) in a try/except (or move the await inside the
existing startup/teardown guard) and on exception perform clean shutdown steps:
call request_manager.shutdown() (or its stop/close method),
stop/bootstrap_server.shutdown() (or the server close routine), and
terminate/join each process in scheduler_procs before re-raising the exception;
ensure you reference the same symbols (on_request_manager_ready,
request_manager, bootstrap_server, scheduler_procs) to locate and run their
existing cleanup paths.

Comment on lines +237 to +246
pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
match self {
Self::Sglang(client) => {
let resp = client.flush_cache(timeout_s).await?;
Ok((resp.success, resp.message))
}
Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
"flush_cache not supported for this backend",
)),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
python - <<'PY'
from pathlib import Path

path = Path("model_gateway/src/routers/grpc/client.rs")
text = path.read_text()

enum_block = text.split("pub enum GrpcClient", 1)[1].split("impl GrpcClient", 1)[0]
method_block = text.split("pub async fn flush_cache", 1)[1].split("/// Fetch tokenizer bundle", 1)[0]

print("enum_has_mlx =", "Mlx(" in enum_block)
print("flush_cache_handles_mlx =", "Self::Mlx" in method_block)
PY

Repository: lightseekorg/smg

Length of output: 111


Handle the GrpcClient::Mlx variant in this match.

The match self on line 238 is non-exhaustive. The GrpcClient enum includes the Mlx variant, but this method does not handle it, preventing compilation.

Suggested fix
     pub async fn flush_cache(&self, timeout_s: f32) -> Result<(bool, String), tonic::Status> {
         match self {
             Self::Sglang(client) => {
                 let resp = client.flush_cache(timeout_s).await?;
                 Ok((resp.success, resp.message))
             }
-            Self::Vllm(_) | Self::Trtllm(_) => Err(tonic::Status::unimplemented(
+            Self::Vllm(_) | Self::Trtllm(_) | Self::Mlx(_) => Err(tonic::Status::unimplemented(
                 "flush_cache not supported for this backend",
             )),
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/routers/grpc/client.rs` around lines 237 - 246, The match
in GrpcClient::flush_cache omits the Self::Mlx variant causing a non-exhaustive
match; add an explicit branch for Self::Mlx in the flush_cache method (alongside
Self::Sglang, Self::Vllm, Self::Trtllm). If the Mlx client exposes a flush_cache
method, call it the same way as in Self::Sglang (await, unpack resp.success and
resp.message, return Ok); if it does not support flush_cache, return
Err(tonic::Status::unimplemented(...)) for Self::Mlx so the match is exhaustive.

Comment on lines +734 to +753
let grpc_futures: Vec<_> = grpc_workers
.iter()
.map(|w| {
let worker = Arc::clone(w);
async move {
let url = worker.url().to_string();
match worker.get_grpc_client().await {
Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await {
Ok((true, _)) => (url, Ok(())),
Ok((false, msg)) => (url, Err(format!("flush failed: {msg}"))),
Err(e) => (url, Err(format!("gRPC error: {e}"))),
},
Ok(None) => (url, Err("no gRPC client available".to_string())),
Err(e) => (url, Err(format!("failed to get gRPC client: {e}"))),
}
}
})
.collect();

let grpc_results: Vec<_> = future::join_all(grpc_futures).await;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
echo "HTTP fan-out path:"
sed -n '52,84p' model_gateway/src/worker/manager.rs
echo
echo "gRPC flush path:"
sed -n '733,759p' model_gateway/src/worker/manager.rs

Repository: lightseekorg/smg

Length of output: 2254


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get full flush_cache_all function
echo "=== Full flush_cache_all function ==="
sed -n '691,780p' model_gateway/src/worker/manager.rs

echo
echo "=== REQUEST_TIMEOUT and MAX_CONCURRENT definitions ==="
rg 'REQUEST_TIMEOUT|MAX_CONCURRENT' model_gateway/src/worker/manager.rs -B 2 -A 2

Repository: lightseekorg/smg

Length of output: 4039


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if flush_cache_all has a caller-level timeout wrapper
echo "=== Calls to flush_cache_all ==="
rg 'flush_cache_all' model_gateway/src --type rust -B 3 -A 3

echo
echo "=== gRPC client builder for timeouts ==="
fd -t f 'grpc_client' crates/grpc_client/src | head -5 | xargs -I {} sh -c 'echo "=== {} ==="; head -100 {}'

Repository: lightseekorg/smg

Length of output: 1580


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Searching for flush_cache method definition in gRPC clients ==="
fd -t f '\.rs$' crates/grpc_client/src | xargs grep -l 'flush_cache' | head -3

echo
echo "=== flush_cache method signatures ==="
rg 'fn flush_cache' crates/grpc_client/src -A 5 | head -40

echo
echo "=== gRPC timeout patterns in model_gateway ==="
rg 'timeout|Timeout|Duration' model_gateway/src/routers/grpc --type rust -c | sort | uniq -c | head -10

Repository: lightseekorg/smg

Length of output: 892


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check for gRPC request timeout patterns in model_gateway ==="
rg 'tokio::time::timeout|timeout\(' model_gateway/src/routers/grpc --type rust -B 2 -A 2 | head -60

Repository: lightseekorg/smg

Length of output: 568


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== All flush_cache calls in the codebase ==="
rg 'flush_cache\(' model_gateway/src --type rust -B 2 -A 2

echo
echo "=== Check flush_cache implementation for timeout_s usage ==="
sed -n '/pub async fn flush_cache/,/^    }/p' crates/grpc_client/src/sglang_scheduler.rs | head -40

Repository: lightseekorg/smg

Length of output: 2129


🏁 Script executed:

#!/bin/bash
set -euo pipeftp
echo "=== Check if timeout behavior is documented or if 0.0 is intentional ==="
git log --all --oneline --grep="flush_cache" -- model_gateway/src/worker/manager.rs | head -5
git log --all -p -- crates/grpc_client/src/sglang_scheduler.rs | grep -A 5 -B 5 'flush_cache' | head -40

Repository: lightseekorg/smg

Length of output: 112


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check if there's any wrapper timeout around flush_cache_all calls ==="
sed -n '1,/^async fn flush_cache/p' model_gateway/src/server.rs | tail -20

echo
echo "=== Check other RPC calls in grpc/client.rs for timeout patterns ==="
rg 'tokio::time::timeout' model_gateway/src/routers/grpc/client.rs -A 3 -B 1 | head -40

Repository: lightseekorg/smg

Length of output: 811


Apply REQUEST_TIMEOUT and concurrency bounding to the gRPC fan-out, matching the HTTP path.

The HTTP fan-out path enforces .timeout(REQUEST_TIMEOUT) (5 seconds) and .buffer_unordered(MAX_CONCURRENT) (32 workers), but the gRPC path calls flush_cache(0.0) (infinite timeout) and uses join_all() (unbounded concurrency). A single stalled gRPC worker can hang the entire cache-flush operation indefinitely; firing all gRPC flushes at once risks overwhelming the fleet on deployments with many workers.

Code comparison

HTTP path (lines 69, 81):

.timeout(REQUEST_TIMEOUT)
.buffer_unordered(MAX_CONCURRENT)

gRPC path (lines 740, 753):

grpc_client.flush_cache(0.0).await
future::join_all(grpc_futures).await

Wrap each flush_cache() call with tokio::time::timeout(REQUEST_TIMEOUT, ...) and replace join_all() with stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect() to match the HTTP implementation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@model_gateway/src/worker/manager.rs` around lines 734 - 753, gRPC fan-out
currently awaits grpc_client.flush_cache(0.0) with join_all, causing unbounded
concurrency and infinite waits; modify the async closure built from grpc_workers
(the one that clones Arc and calls get_grpc_client()) to wrap the
flush_cache(...) call with tokio::time::timeout(REQUEST_TIMEOUT, ...), handle
Timeout errors as an Err result string, and then replace the
future::join_all(grpc_futures).await aggregation with
stream::iter(grpc_futures).buffer_unordered(MAX_CONCURRENT).collect().await so
concurrency is bounded and each flush respects REQUEST_TIMEOUT; keep handling of
Ok(Some(grpc_client)), Ok(None), and Err from get_grpc_client unchanged but
adapt error strings for timeouts and cancelled tasks.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@crates/grpc_client/src/sglang_scheduler.rs`:
- Around line 304-310: The flush_cache method currently forwards timeout_s to
the server but does not enforce a local deadline, so the .await on
client.flush_cache(request) can hang; update the method in sglang_scheduler.rs
(the function that builds proto::FlushCacheRequest and calls
client.flush_cache(request)) to validate timeout_s (treat <= 0.0 as "no local
timeout") and, when timeout_s > 0.0, call the RPC via
tokio::time::timeout(Duration::from_secs_f32(timeout_s),
client.flush_cache(request)).await, mapping a tokio timeout error to a
tonic::Status::deadline_exceeded; if timeout_s <= 0.0 just await the RPC
normally. Ensure you still clone self.client as before and propagate other RPC
errors unchanged.

In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py`:
- Around line 965-982: The send_communicator_req method should validate
communicator_name before using getattr to avoid cryptic TypeError: add an
explicit registry (e.g., a set or dict like communicator_registry or
_allowed_communicators on the same class) listing valid communicator attribute
names, then inside send_communicator_req call auto_create_handle_loop(), check
that communicator_name is in that registry, retrieve the attribute with
getattr(self, communicator_name), verify it is callable, and if not present or
not callable raise a clear ValueError mentioning the invalid communicator_name
and listing allowed names; update any callers if needed and keep the rest of the
method (await communicator(req, timeout=timeout)) unchanged.
- Around line 546-579: Remove the dead branch that handles GetLoadsReqOutput in
the handle_loop function so only _handle_tokenizer_loop processes scheduler-sent
communicator responses; locate the GetLoadsReqOutput conditional inside
handle_loop and delete that elif branch (and any now-unused local references)
leaving GetLoadsReqOutput handling only in _handle_tokenizer_loop, ensuring no
other logic depends on the removed branch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 9c187e9b-e3e3-4ef4-9e5b-6ff3b5743d46

📥 Commits

Reviewing files that changed from the base of the PR and between 18d209c and 6339124.

📒 Files selected for processing (2)
  • crates/grpc_client/src/sglang_scheduler.rs
  • grpc_servicer/smg_grpc_servicer/sglang/request_manager.py

Comment on lines +304 to +310
timeout_s: f32,
) -> Result<proto::FlushCacheResponse, tonic::Status> {
debug!("Requesting cache flush (timeout_s={timeout_s})");
let request = Request::new(proto::FlushCacheRequest { timeout_s });

let mut client = self.client.clone();
let response = client.flush_cache(request).await?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Inspect flush_cache call sites and existing timeout/deadline patterns.

set -euo pipefail

echo "Flush cache call sites:"
rg -n -C3 --type rust '\bflush_cache\s*\('

echo
echo "Existing timeout/deadline patterns:"
rg -n -C3 --type rust 'tokio::time::timeout|Request::set_timeout|REQUEST_TIMEOUT|deadline_exceeded'

Repository: lightseekorg/smg

Length of output: 30490


🏁 Script executed:

#!/bin/bash
# Check all public async methods in sglang_scheduler.rs that make RPC calls
ast-grep --pattern $'impl SglangSchedulerClient {
  $$$
  pub async fn $_($$$) -> $_ {
    $$$
  }
  $$$
}'

Repository: lightseekorg/smg

Length of output: 50372


Enforce a client-side deadline for the flush RPC.

timeout_s is only forwarded to the scheduler; the gRPC await itself can block indefinitely if the scheduler or transport never responds. Since WorkerManager::flush_cache_all awaits grpc_client.flush_cache(0.0), this can stall the admin flush path. Validate the timeout and wrap the RPC with a local deadline.

Proposed fix
     pub async fn flush_cache(
         &self,
         timeout_s: f32,
     ) -> Result<proto::FlushCacheResponse, tonic::Status> {
+        let scheduler_wait_timeout = Duration::try_from_secs_f32(timeout_s).map_err(|_| {
+            tonic::Status::invalid_argument("timeout_s must be finite and non-negative")
+        })?;
+        let rpc_timeout = scheduler_wait_timeout.saturating_add(Duration::from_secs(5));
+
         debug!("Requesting cache flush (timeout_s={timeout_s})");
         let request = Request::new(proto::FlushCacheRequest { timeout_s });
 
         let mut client = self.client.clone();
-        let response = client.flush_cache(request).await?;
+        let response = tokio::time::timeout(rpc_timeout, client.flush_cache(request))
+            .await
+            .map_err(|_| tonic::Status::deadline_exceeded("flush_cache RPC timed out"))??;
         debug!("Flush cache response received");
         Ok(response.into_inner())
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
timeout_s: f32,
) -> Result<proto::FlushCacheResponse, tonic::Status> {
debug!("Requesting cache flush (timeout_s={timeout_s})");
let request = Request::new(proto::FlushCacheRequest { timeout_s });
let mut client = self.client.clone();
let response = client.flush_cache(request).await?;
timeout_s: f32,
) -> Result<proto::FlushCacheResponse, tonic::Status> {
let scheduler_wait_timeout = Duration::try_from_secs_f32(timeout_s).map_err(|_| {
tonic::Status::invalid_argument("timeout_s must be finite and non-negative")
})?;
let rpc_timeout = scheduler_wait_timeout.saturating_add(Duration::from_secs(5));
debug!("Requesting cache flush (timeout_s={timeout_s})");
let request = Request::new(proto::FlushCacheRequest { timeout_s });
let mut client = self.client.clone();
let response = tokio::time::timeout(rpc_timeout, client.flush_cache(request))
.await
.map_err(|_| tonic::Status::deadline_exceeded("flush_cache RPC timed out"))??;
debug!("Flush cache response received");
Ok(response.into_inner())
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/grpc_client/src/sglang_scheduler.rs` around lines 304 - 310, The
flush_cache method currently forwards timeout_s to the server but does not
enforce a local deadline, so the .await on client.flush_cache(request) can hang;
update the method in sglang_scheduler.rs (the function that builds
proto::FlushCacheRequest and calls client.flush_cache(request)) to validate
timeout_s (treat <= 0.0 as "no local timeout") and, when timeout_s > 0.0, call
the RPC via tokio::time::timeout(Duration::from_secs_f32(timeout_s),
client.flush_cache(request)).await, mapping a tokio timeout error to a
tonic::Status::deadline_exceeded; if timeout_s <= 0.0 just await the RPC
normally. Ensure you still clone self.client as before and propagate other RPC
errors unchanged.

Comment on lines +546 to +579
async def _handle_tokenizer_loop(self):
"""Process communicator responses from the scheduler's send_to_tokenizer socket.

The scheduler sends ProfileReqOutput (and similar communicator responses)
directly to tokenizer_ipc_name, not through the detokenizer. This loop
receives those responses and dispatches them to the appropriate communicator.
"""
while not self.gracefully_exit:
try:
recv_obj = await self.recv_from_tokenizer.recv_pyobj()
if isinstance(recv_obj, ProfileReqOutput):
self.profile_communicator.handle_recv(recv_obj)
elif isinstance(recv_obj, FlushCacheReqOutput):
self.flush_cache_communicator.handle_recv(recv_obj)
elif isinstance(recv_obj, GetInternalStateReqOutput):
self.get_internal_state_communicator.handle_recv(recv_obj)
elif isinstance(recv_obj, GetLoadsReqOutput):
self.get_loads_communicator.handle_recv(recv_obj)
else:
logger.warning(f"Unknown type on tokenizer socket: {type(recv_obj)}")
except zmq.error.Again:
if self.gracefully_exit:
break
continue
except zmq.error.ZMQError as e:
if self.gracefully_exit:
logger.debug(f"ZMQ recv interrupted during shutdown: {e}")
break
logger.error(f"ZMQ error in tokenizer loop: {e}\n{get_exception_traceback()}")
break
except Exception as e:
logger.error(f"Tokenizer loop error: {e}\n{get_exception_traceback()}")
if self.gracefully_exit:
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Locate where sglang scheduler sends GetLoadsReqOutput (tokenizer vs detokenizer IPC).
python -c "import sglang, os; print(os.path.dirname(sglang.__file__))" 2>/dev/null || true

# Search installed sglang (if present) and the repo for send sites.
rg -nP -C3 '\bGetLoadsReqOutput\s*\(' --glob '!**/request_manager.py'
rg -nP -C3 'send_to_tokenizer.*GetLoads|send_to_detokenizer.*GetLoads'

Repository: lightseekorg/smg

Length of output: 42


🏁 Script executed:

# First, locate the request_manager.py file
fd -n "request_manager.py" | head -20

Repository: lightseekorg/smg

Length of output: 290


🏁 Script executed:

# Check the structure and confirm file exists
git ls-files | grep -E "request_manager\.py|sglang.*request_manager"

Repository: lightseekorg/smg

Length of output: 117


🏁 Script executed:

# Read the specific sections mentioned in the review
if [ -f "grpc_servicer/smg_grpc_servicer/sglang/request_manager.py" ]; then
  echo "=== Lines 520-530 (handle_loop GetLoadsReqOutput) ===" 
  sed -n '520,530p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
  echo ""
  echo "=== Lines 560-570 (_handle_tokenizer_loop GetLoadsReqOutput) ===" 
  sed -n '560,570p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.py
fi

Repository: lightseekorg/smg

Length of output: 1306


🏁 Script executed:

# Search for all GetLoadsReqOutput occurrences and routing
rg -n "GetLoadsReqOutput" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py -B2 -A2

Repository: lightseekorg/smg

Length of output: 1708


🏁 Script executed:

# Check for any comments or documentation about tokenizer vs detokenizer routing
rg -n "tokenizer.*[Gg]et[Ll]oads|detokenizer.*[Gg]et[Ll]oads" grpc_servicer/smg_grpc_servicer/sglang/ -i

Repository: lightseekorg/smg

Length of output: 42


🏁 Script executed:

# Check where handle_loop and _handle_tokenizer_loop are started
rg -n "handle_loop|_handle_tokenizer_loop" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py | grep -E "(asyncio\.create_task|gather|Task|start)"

Repository: lightseekorg/smg

Length of output: 165


🏁 Script executed:

# Check the full auto_create_handle_loop method to see both loops started
sed -n '990,1020p' grpc_servicer/smg_grpc_servicer/sglang/request_manager.py

Repository: lightseekorg/smg

Length of output: 1476


🏁 Script executed:

# Search for any comments or history about GetLoadsReqOutput and which path it should use
rg -n "GetLoads" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py -B3 -A3 | head -80

Repository: lightseekorg/smg

Length of output: 2441


🏁 Script executed:

# Check what sockets are being used and their purposes
rg -n "recv_from_detokenizer|recv_from_tokenizer" grpc_servicer/smg_grpc_servicer/sglang/request_manager.py | head -20

Repository: lightseekorg/smg

Length of output: 234


Remove dead GetLoadsReqOutput handling from handle_loop — scheduler sends via tokenizer socket only.

Both handle_loop (line 524–525) and _handle_tokenizer_loop (line 562–563) handle GetLoadsReqOutput, but the comment at lines 195–196 explicitly states the scheduler sends communicator responses (including GetLoadsReqOutput) directly via send_to_tokenizer, which is the new path used by _handle_tokenizer_loop. The handle_loop branch is dead code and should be removed to avoid confusion and reduce maintenance burden. Keep only the _handle_tokenizer_loop branch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py` around lines 546 -
579, Remove the dead branch that handles GetLoadsReqOutput in the handle_loop
function so only _handle_tokenizer_loop processes scheduler-sent communicator
responses; locate the GetLoadsReqOutput conditional inside handle_loop and
delete that elif branch (and any now-unused local references) leaving
GetLoadsReqOutput handling only in _handle_tokenizer_loop, ensuring no other
logic depends on the removed branch.

Comment on lines +965 to +982
async def send_communicator_req(self, req, communicator_name: str, timeout: float = 30.0):
"""Send a request to the scheduler via a named communicator and return responses.

This is the generic transport method for request/response patterns that
go through the tokenizer IPC path (profile, flush_cache, get_internal_state, etc.).
Business logic (request construction, response interpretation) belongs in the caller.

Args:
req: The request object to send to the scheduler.
communicator_name: Attribute name of the communicator (e.g., "profile_communicator").
timeout: Timeout in seconds for the ZMQ round-trip.

Returns:
List of response objects from scheduler(s), one per DP rank.
"""
self.auto_create_handle_loop()
communicator = getattr(self, communicator_name)
return await communicator(req, timeout=timeout)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Optional: validate communicator_name via an explicit registry.

getattr(self, communicator_name) accepts any attribute on self and silently resolves to non-communicator objects (e.g., "send_to_scheduler"), failing later with a cryptic TypeError when invoked. A small registry makes the allowed set explicit and yields a clear error on misuse.

♻️ Proposed refactor
     async def send_communicator_req(self, req, communicator_name: str, timeout: float = 30.0):
         ...
         self.auto_create_handle_loop()
-        communicator = getattr(self, communicator_name)
-        return await communicator(req, timeout=timeout)
+        registry = {
+            "profile_communicator": self.profile_communicator,
+            "flush_cache_communicator": self.flush_cache_communicator,
+            "get_internal_state_communicator": self.get_internal_state_communicator,
+            "get_loads_communicator": self.get_loads_communicator,
+        }
+        try:
+            communicator = registry[communicator_name]
+        except KeyError as e:
+            raise ValueError(f"Unknown communicator: {communicator_name!r}") from e
+        return await communicator(req, timeout=timeout)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@grpc_servicer/smg_grpc_servicer/sglang/request_manager.py` around lines 965 -
982, The send_communicator_req method should validate communicator_name before
using getattr to avoid cryptic TypeError: add an explicit registry (e.g., a set
or dict like communicator_registry or _allowed_communicators on the same class)
listing valid communicator attribute names, then inside send_communicator_req
call auto_create_handle_loop(), check that communicator_name is in that
registry, retrieve the attribute with getattr(self, communicator_name), verify
it is callable, and if not present or not callable raise a clear ValueError
mentioning the invalid communicator_name and listing allowed names; update any
callers if needed and keep the rest of the method (await communicator(req,
timeout=timeout)) unchanged.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 633912461e

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

"flush_cache_communicator",
timeout=comm_timeout,
)
except TimeoutError:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Catch asyncio timeout from FlushCache communicator call

send_communicator_req() waits on asyncio.wait_for, which raises asyncio.TimeoutError in this Python 3.10 environment, but this handler only catches built-in TimeoutError. When a scheduler flush response times out, it falls into the generic Exception branch and is reported as INTERNAL instead of DEADLINE_EXCEEDED, which misclassifies real timeout conditions and breaks client retry/diagnostic behavior during scheduler stalls.

Useful? React with 👍 / 👎.

async move {
let url = worker.url().to_string();
match worker.get_grpc_client().await {
Ok(Some(grpc_client)) => match grpc_client.flush_cache(0.0).await {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Bound each gRPC flush fan-out call with a timeout

The gRPC branch fans out grpc_client.flush_cache(0.0) and awaits join_all without any per-worker timeout, unlike the HTTP branch that enforces REQUEST_TIMEOUT. If a single gRPC worker becomes slow or unreachable, /flush_cache can block indefinitely on that future and stall the whole admin endpoint during partial outages; wrap each gRPC flush call with tokio::time::timeout and record timeout as a failed worker.

Useful? React with 👍 / 👎.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 6, 2026

This pull request has been automatically marked as stale because it has not had any activity within 14 days. It will be automatically closed if no further activity occurs within 16 days. Leave a comment if you feel this pull request should remain open. Thank you!

@github-actions github-actions Bot added the stale PR has been inactive for 14+ days label May 6, 2026
@Huixxi
Copy link
Copy Markdown
Contributor

Huixxi commented May 8, 2026

@github-actions github-actions Bot removed the stale PR has been inactive for 14+ days label May 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

grpc gRPC client and router changes model-gateway Model gateway crate changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants