[Router] Add GetTokenizer gRPC method to stream tokenizer files from backend to router#12408
[Router] Add GetTokenizer gRPC method to stream tokenizer files from backend to router#12408YouNeedCryDear wants to merge 1 commit intosgl-project:mainfrom
Conversation
Summary of ChangesHello @YouNeedCryDear, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to the SGLang router's tokenizer management by implementing a dynamic streaming mechanism. Instead of relying on statically configured local files, the router can now fetch tokenizer bundles directly from the backend via a new gRPC method. This change streamlines the deployment process, guarantees consistency between the router and backend, and lays the groundwork for more flexible and adaptive model support. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new GetTokenizer gRPC method to allow the router to stream tokenizer files from the backend. The changes include new protobuf definitions and a Rust client implementation for this streaming RPC. My review focuses on the client implementation. I've identified a significant ambiguity in the streaming protocol design for handling multi-file tokenizer bundles, which could lead to data corruption. I've also suggested refactoring the main client function and improving its error handling for better maintainability. Overall, the feature is a good addition, but the protocol details need to be solidified before the server-side is implemented.
| // Collect file chunks | ||
| let mut compressed_data = Vec::new(); | ||
| let mut expected_chunk_index = 0u32; | ||
| let mut last_chunk_received = false; | ||
|
|
||
| debug!("Starting to receive file chunks"); | ||
| while let Some(chunk) = stream.message().await? { | ||
| match chunk.chunk { | ||
| Some(proto::get_tokenizer_chunk::Chunk::FileChunk(file_chunk)) => { | ||
| // Validate chunk ordering | ||
| if file_chunk.chunk_index != expected_chunk_index { | ||
| return Err(format!( | ||
| "Protocol error: expected chunk index {}, got {}", | ||
| expected_chunk_index, file_chunk.chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
|
|
||
| debug!( | ||
| "Received file chunk {}: {} bytes, is_last={}", | ||
| file_chunk.chunk_index, | ||
| file_chunk.data.len(), | ||
| file_chunk.is_last_chunk | ||
| ); | ||
|
|
||
| // Append data | ||
| compressed_data.extend_from_slice(&file_chunk.data); | ||
|
|
||
| // Check if this is the last chunk | ||
| if file_chunk.is_last_chunk { | ||
| last_chunk_received = true; | ||
| debug!( | ||
| "Last chunk received, total data size: {} bytes", | ||
| compressed_data.len() | ||
| ); | ||
| break; | ||
| } | ||
|
|
||
| expected_chunk_index += 1; | ||
| } | ||
| Some(proto::get_tokenizer_chunk::Chunk::Metadata(_)) => { | ||
| return Err(format!( | ||
| "Protocol error: unexpected metadata chunk at position {}", | ||
| expected_chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
| None => { | ||
| return Err(format!( | ||
| "Protocol error: empty chunk at position {}", | ||
| expected_chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The current implementation collects all file chunks into a single compressed_data vector, ignoring the file_chunk.file_name field. This assumes the server sends a single contiguous data stream (like an archive), which seems to contradict the file_name field in the TokenizerFileChunk proto.
This creates ambiguity in the protocol for multi-file bundles:
- If the server sends a single archive, the
file_namefield inTokenizerFileChunkis redundant and the client would later need to unpack this archive. - If the server sends chunks for individual files, this client implementation will corrupt the data by concatenating them. The client would need to be updated to segregate data by
file_name(e.g., into aHashMap<String, Vec<u8>>).
Furthermore, if individual files are streamed:
- The
chunk_indexshould probably be tracked on a per-file basis, not globally. - The
is_last_chunkflag's meaning is unclear: does it mark the end of a file or the end of the entire bundle? The client assumes the latter.
It's important to clarify the protocol design for handling multiple files and ensure the client implementation correctly reflects it.
| pub async fn get_tokenizer( | ||
| &self, | ||
| ) -> Result<TokenizerBundle, Box<dyn std::error::Error + Send + Sync>> { | ||
| debug!("Requesting tokenizer from backend"); | ||
| let request = Request::new(proto::GetTokenizerRequest {}); | ||
|
|
||
| let mut client = self.client.clone(); | ||
| let mut stream = client.get_tokenizer(request).await?.into_inner(); | ||
|
|
||
| // First message must be metadata | ||
| let first_chunk = stream | ||
| .message() | ||
| .await? | ||
| .ok_or("Empty stream: expected metadata chunk")?; | ||
|
|
||
| let metadata = match first_chunk.chunk { | ||
| Some(proto::get_tokenizer_chunk::Chunk::Metadata(meta)) => { | ||
| debug!( | ||
| "Received tokenizer metadata: model={}, fingerprint={}, files={}, format={}", | ||
| meta.model_identifier, | ||
| meta.fingerprint, | ||
| meta.files.len(), | ||
| meta.bundle_format | ||
| ); | ||
| meta | ||
| } | ||
| Some(proto::get_tokenizer_chunk::Chunk::FileChunk(_)) => { | ||
| return Err("Protocol error: first chunk must be metadata, got file chunk".into()); | ||
| } | ||
| None => { | ||
| return Err("Protocol error: first chunk is empty".into()); | ||
| } | ||
| }; | ||
|
|
||
| // Collect file chunks | ||
| let mut compressed_data = Vec::new(); | ||
| let mut expected_chunk_index = 0u32; | ||
| let mut last_chunk_received = false; | ||
|
|
||
| debug!("Starting to receive file chunks"); | ||
| while let Some(chunk) = stream.message().await? { | ||
| match chunk.chunk { | ||
| Some(proto::get_tokenizer_chunk::Chunk::FileChunk(file_chunk)) => { | ||
| // Validate chunk ordering | ||
| if file_chunk.chunk_index != expected_chunk_index { | ||
| return Err(format!( | ||
| "Protocol error: expected chunk index {}, got {}", | ||
| expected_chunk_index, file_chunk.chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
|
|
||
| debug!( | ||
| "Received file chunk {}: {} bytes, is_last={}", | ||
| file_chunk.chunk_index, | ||
| file_chunk.data.len(), | ||
| file_chunk.is_last_chunk | ||
| ); | ||
|
|
||
| // Append data | ||
| compressed_data.extend_from_slice(&file_chunk.data); | ||
|
|
||
| // Check if this is the last chunk | ||
| if file_chunk.is_last_chunk { | ||
| last_chunk_received = true; | ||
| debug!( | ||
| "Last chunk received, total data size: {} bytes", | ||
| compressed_data.len() | ||
| ); | ||
| break; | ||
| } | ||
|
|
||
| expected_chunk_index += 1; | ||
| } | ||
| Some(proto::get_tokenizer_chunk::Chunk::Metadata(_)) => { | ||
| return Err(format!( | ||
| "Protocol error: unexpected metadata chunk at position {}", | ||
| expected_chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
| None => { | ||
| return Err(format!( | ||
| "Protocol error: empty chunk at position {}", | ||
| expected_chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Validate that we received the last chunk marker | ||
| if !last_chunk_received { | ||
| return Err(format!( | ||
| "Protocol error: stream ended without is_last_chunk flag (received {} chunks)", | ||
| expected_chunk_index | ||
| ) | ||
| .into()); | ||
| } | ||
|
|
||
| debug!("Tokenizer bundle download complete"); | ||
| Ok(TokenizerBundle { | ||
| metadata, | ||
| compressed_data, | ||
| }) | ||
| } |
There was a problem hiding this comment.
This function is quite long and handles multiple concerns (receiving metadata, receiving file chunks, validation). For better readability and maintainability, consider refactoring it into smaller, more focused functions. For example, you could have separate functions for processing the metadata and for streaming the file chunks.
Additionally, the error handling could be improved by using a custom error enum instead of Box<dyn std::error::Error>. This provides more structured errors, making it easier for callers to handle specific failure modes programmatically.
Here's an example of how a custom error type could look using thiserror:
#[derive(Debug, thiserror::Error)]
pub enum GetTokenizerError {
#[error("gRPC error: {0}")]
Grpc(#[from] tonic::Status),
#[error("Empty stream: expected metadata chunk")]
EmptyStream,
#[error("Protocol error: first chunk must be metadata, but got a file chunk")]
MetadataNotFirst,
#[error("Protocol error: first chunk is empty")]
FirstChunkEmpty,
#[error("Protocol error: unexpected metadata chunk at position {0}")]
UnexpectedMetadata(u32),
#[error("Protocol error: empty chunk at position {0}")]
EmptyChunk(u32),
#[error("Protocol error: expected chunk index {expected}, but got {got}")]
ChunkOutOfOrder { expected: u32, got: u32 },
#[error("Protocol error: stream ended without is_last_chunk flag (received {0} chunks)")]
StreamEndedPrematurely(u32),
}Using this would make the error handling logic within get_tokenizer cleaner and more explicit.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if file_chunk.chunk_index != expected_chunk_index { | ||
| return Err(format!( | ||
| "Protocol error: expected chunk index {}, got {}", | ||
| expected_chunk_index, file_chunk.chunk_index | ||
| ) | ||
| .into()); |
There was a problem hiding this comment.
Return an Error type instead of String in protocol validation
The new get_tokenizer function returns Result<TokenizerBundle, Box<dyn std::error::Error + Send + Sync>>, but the error paths use return Err(format!("…").into());. format! produces a String, which does not implement std::error::Error, so these branches cannot be converted into Box<dyn Error> and the crate will fail to compile. Use an error type (e.g., anyhow::Error, tonic::Status, or a custom struct) rather than a bare String, and update the other format! branches in this function accordingly.
Useful? React with 👍 / 👎.
This is FALSE - Router already has create_tokenizer_async() that handles:
Adding this management in grpc server is even more complexed
Can you explain a bit more? I thought we would share the same PersistentVolume for workers and routers.
Not sure i understand this correctly. I thought same as #12045 , the root cause is because we need |
|
Close this PR as the changes are merged into #12407 |
Summary
This PR adds a new
GetTokenizergRPC method to the SGLang scheduler service, enabling the router to dynamically fetch tokenizer files from the backend via streaming. This eliminates the need for static tokenizer configuration in the router and ensures the router always uses the same tokenizer as the backend.Motivation
Currently, the SGLang router requires static tokenizer files to be configured locally. This creates several challenges:
By enabling dynamic tokenizer streaming, the router can automatically obtain the correct tokenizer from the backend, simplifying deployment and ensuring consistency.
Python server implementation of the
GetTokenizerRPC in #12407Implementation Details
Protocol Definition
Added new protobuf messages for tokenizer streaming:
GetTokenizerRequest: Request message (empty for now)GetTokenizerChunk: Streamed response containing either metadata or file chunksTokenizerMetadata: Information about the tokenizer bundle (model identifier, fingerprint, file list, format)TokenizerFileDescriptor: Metadata for individual tokenizer filesTokenizerFileChunk: Chunked file data with sequential indexing and compression supportThe protocol follows a metadata-first streaming pattern:
is_last_chunkflagRust Client Implementation
Added
get_tokenizer()method toSglangSchedulerClientthat:TokenizerBundlecontaining metadata and compressed dataThe implementation includes robust error handling for protocol violations:
Future Work
This PR lays the foundation for dynamic tokenizer streaming. Future enhancements will include:
TokenizerRegistryfor caching downloaded tokenizersTesting
Manual testing was performed to verify:
Integration testing will be added once the Python server PR is merged.