Skip to content

refactor: Refactor kv event publishers#1287

Merged
jthomson04 merged 5 commits into
mainfrom
jthomson04/refactor-kv-event-publishers
May 30, 2025
Merged

refactor: Refactor kv event publishers#1287
jthomson04 merged 5 commits into
mainfrom
jthomson04/refactor-kv-event-publishers

Conversation

@jthomson04

@jthomson04 jthomson04 commented May 30, 2025

Copy link
Copy Markdown
Contributor

We currently have a KvEventPublisher and ZmqKvEventPublisher which have relatively similar functionalities. This MR unifies these two to create a single configurable KvEventPublisher.

Summary by CodeRabbit

  • New Features

    • Improved event publishing with unified configuration options for event sources.
    • Added support for configuring ZMQ as an event source.
  • Refactor

    • Streamlined event processing and publishing for better reliability and easier cancellation.
    • Consolidated background tasks into a single, more manageable workflow.
  • Bug Fixes

    • Enhanced shutdown and cleanup handling for event publishing processes.

@coderabbitai

coderabbitai Bot commented May 30, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

The changes refactor the key-value event publishing system to introduce a unified configuration and management approach for event sources. A new KvEventSourceConfig enum is added, and the KvEventPublisher struct is updated to support optional event sources, coordinated shutdown, and improved cancellation handling. Python and C bindings are updated accordingly to use the new API.

Changes

File(s) Change Summary
lib/llm/src/kv_router/publisher.rs Introduced KvEventSourceConfig and KvEventSource enums; refactored KvEventPublisher to support unified event source management, coordinated shutdown, and improved task handling; updated tests.
lib/bindings/python/rust/llm/kv.rs Updated to use new KvEventPublisher API with optional event source config; refactored construction and error handling.
lib/bindings/c/src/lib.rs Modified dynamo_create_kv_publisher to call KvEventPublisher::new with an explicit None for the event source config parameter.

Sequence Diagram(s)

sequenceDiagram
    participant Python
    participant ZmqKvEventPublisher
    participant KvEventPublisher
    participant KvEventSource
    participant ZMQ
    participant EventProcessor

    Python->>ZmqKvEventPublisher: new(config)
    ZmqKvEventPublisher->>KvEventPublisher: new(component, worker_id, kv_block_size, Some(KvEventSourceConfig::Zmq))
    KvEventPublisher->>KvEventSource: start(zmq_endpoint, zmq_topic, ...)
    KvEventSource->>ZMQ: subscribe and listen
    ZMQ-->>KvEventSource: multipart messages
    KvEventSource->>KvEventPublisher: send KvCacheEvent(s)
    KvEventPublisher->>EventProcessor: process and publish events
Loading

Poem

In the warren, code flows anew,
Event sources unified, as rabbits pursue.
ZMQ or not, the stream is clean,
With tokens to cancel and shutdowns serene.
From Rust to Python, the signals hop—
This refactor’s a leap, not a stop!
🐇✨


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
lib/llm/src/kv_router/publisher.rs (1)

330-332: "ZMQ listener exiting" log never prints due to early break

Because every break exits the loop, the tracing::debug! inside the loop is skipped. Move the log outside the loop so it always executes:

-        tracing::debug!("ZMQ listener exiting");
-    }
+    }
+    tracing::debug!("ZMQ listener exiting");
lib/bindings/python/rust/llm/kv.rs (1)

168-186: Consider automatic cleanup for ZMQ publisher to prevent resource leaks

ZmqKvEventPublisher exposes an explicit shutdown, but if Python code forgets to call it, the underlying tasks will continue running. Implementing __del__ (or __exit__ via __enter__/__exit__) that calls self.shutdown() would make the binding safer and more Pythonic.

No code change required here, but adding:

fn __del__(&mut self) {
    self.shutdown();
}

would eliminate this class of leaks.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6336143 and ec1ec8e.

📒 Files selected for processing (2)
  • lib/bindings/python/rust/llm/kv.rs (3 hunks)
  • lib/llm/src/kv_router/publisher.rs (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build and Test - vllm
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (.)

Comment thread lib/llm/src/kv_router/publisher.rs
Comment thread lib/llm/src/kv_router/publisher.rs Outdated
Comment thread lib/llm/src/kv_router/publisher.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
lib/llm/src/kv_router/publisher.rs (1)

221-332: Busy-loop issue fixed, but consider improving remaining unwrap() usage.

The previous busy-loop issue has been resolved - when tx.send fails, the function now returns instead of continuing in a loop. However, there are still some unwrap() calls that could be made more robust:

Consider replacing the remaining unwrap() calls with proper error handling:

-                let payload = frames.pop().unwrap();
-                let seq_bytes = frames.pop().unwrap();
+                let Some(payload) = frames.pop() else {
+                    tracing::warn!("Missing payload frame");
+                    continue;
+                };
+                let Some(seq_bytes) = frames.pop() else {
+                    tracing::warn!("Missing sequence bytes frame");
+                    continue;
+                };
-                let seq = u64::from_be_bytes(seq_bytes.try_into().unwrap());
+                let Ok(seq_array) = seq_bytes.try_into() else {
+                    tracing::warn!("Failed to convert sequence bytes to array");
+                    continue;
+                };
+                let seq = u64::from_be_bytes(seq_array);
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8934bc8 and cd8feae.

📒 Files selected for processing (1)
  • lib/llm/src/kv_router/publisher.rs (12 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build and Test - vllm
  • GitHub Check: pre-merge-rust (lib/bindings/python)
  • GitHub Check: pre-merge-rust (lib/runtime/examples)
  • GitHub Check: pre-merge-rust (.)
🔇 Additional comments (7)
lib/llm/src/kv_router/publisher.rs (7)

48-52: Well-designed configuration enum for event sources.

The KvEventSourceConfig enum provides a clean, type-safe way to configure different event sources. The design is extensible and the ZMQ variant captures the necessary configuration parameters.


54-96: Good encapsulation and error handling in KvEventSource.

The KvEventSource enum provides clean abstraction over different event source types. The start method properly returns Result for error propagation, and the shutdown method ensures proper cleanup of background tasks.


98-109: Struct changes support unified event source architecture.

The addition of optional source and cancellation_token fields enables coordinated shutdown and flexible configuration while maintaining backward compatibility.


112-151: Excellent fix for the unwrap issue from previous reviews.

The constructor now properly propagates errors using the ? operator instead of unwrap() when starting the event source. This prevents potential panics and allows proper error handling up the call stack.


162-177: Good resource management with shutdown and Drop implementation.

The shutdown method properly cancels the token and cleans up the event source. The Drop implementation ensures cleanup happens automatically, following good resource management practices.


179-205: Excellent error handling improvement from previous reviews.

The function now properly handles publish errors by logging them instead of using unwrap(). This prevents task crashes and allows the event processor to continue running. The refactored design with direct KvCacheEvent processing is also cleaner.


706-812: Test updates correctly reflect the new API and architecture.

The test functions have been properly updated to work with the new unified event source design. The tests cover the key functionality and verify both the event processing pipeline and ZMQ message handling.

@jthomson04 jthomson04 merged commit 9210a26 into main May 30, 2025
13 checks passed
@jthomson04 jthomson04 deleted the jthomson04/refactor-kv-event-publishers branch May 30, 2025 16:04
biswapanda added a commit that referenced this pull request May 5, 2026
…clusion

Three improvements borrowed from PR #9141 (Ameen Patel) on top of the
existing PASSTHROUGH_EXTRA_FIELDS expansion:

1. get_stop_token_ids returns Result<Option<Vec<TokenIdType>>>, not
   Option<Vec<TokenIdType>>. Malformed payloads (e.g.
   stop_token_ids: "not-an-array") now surface as a typed 400 with the
   diagnostic 'stop_token_ids must be an array of unsigned token IDs:
   {err}'. extract_stop_conditions propagates the Result via ?.

   Replaces the prior silent-fallback Option<> variant which dropped
   malformed inputs without telling the caller. Silent drops on RL
   correctness primitives (stop conditions affect what tokens
   the engine emits) is the bug class CR-9 was about — same principle
   applies here.

2. Mutual-exclusion between messages and pre-tokenized input is now
   scoped to the canonical TOP-LEVEL prompt_token_ids extension only.
   The legacy nvext.token_data channel — which the verifiers
   dynamo_chat_nvext renderer transport (#1287) uses with placeholder
   messages 'role: user, content: (token-in mode)' — is allowed to
   coexist with non-empty messages. validate_messages still gates the
   empty-messages-with-no-tokens case.

   Without this relaxation, the renderer transport's placeholder
   pattern would 400 on every request.

3. Two new tests in test_common_ext.rs:
   - test_chat_completions_stop_token_ids_extraction: positive case
     with nvext.token_data + top-level stop_token_ids (lifted from
     PR #9141 verbatim).
   - test_chat_completions_stop_token_ids_malformed_returns_400:
     verifies the typed-error path on bad input.

Pre-existing test struct-init sites in test_common_ext.rs were missing
required fields (return_token_ids, tokens) added to the
NvCreateChatCompletionRequest struct since the tests were written. Three
sites updated to construct cleanly. cargo test test_common_ext: 15
tests, 15 passes.
yao531441 pushed a commit to yao531441/dynamo that referenced this pull request May 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants