Task 002: Introduce RoutingPolicy Trait
Summary
Create a unified RoutingPolicy trait that enables all routing algorithms (Random, RoundRobin, CacheAware, PowerOfTwo) to work seamlessly in both regular and PD routing modes, eliminating code duplication.
Problem Statement
The current routing implementation has several issues:
- Routing policies are duplicated between regular and PD routers
- PowerOfTwo policy only exists in PD mode, but could benefit regular routing
- CacheAware logic is copy-pasted with slight variations
- Adding new routing policies requires modifying router internals
- No clear interface or contract for routing algorithms
Proposed Solution
1. RoutingPolicy Trait
Define a trait that all routing policies must implement:
// src/routing/policies/mod.rs
#[async_trait]
pub trait RoutingPolicy: Send + Sync {
/// Select a single worker for regular routing
async fn select_single(
&self,
workers: &[Arc<dyn Worker>],
request: &serde_json::Value,
) -> Result<Arc<dyn Worker>, RoutingError>;
/// Select prefill and decode workers for PD routing
async fn select_pair(
&self,
prefill_workers: &[Arc<dyn Worker>],
decode_workers: &[Arc<dyn Worker>],
request: &serde_json::Value,
) -> Result<(Arc<dyn Worker>, Arc<dyn Worker>), RoutingError>;
/// Notify policy of request completion (for stateful policies)
fn on_request_complete(&self, worker_url: &str, success: bool);
/// Get policy name for metrics and debugging
fn name(&self) -> &'static str;
}
2. Common Policy Behaviors
Extract common functionality into helper traits:
pub trait LoadBalancing {
fn select_least_loaded(&self, workers: &[Arc<dyn Worker>]) -> Option<Arc<dyn Worker>> {
workers.iter()
.filter(|w| w.is_healthy())
.min_by_key(|w| w.load().load(Ordering::Relaxed))
.cloned()
}
fn get_healthy_workers(&self, workers: &[Arc<dyn Worker>]) -> Vec<Arc<dyn Worker>> {
workers.iter()
.filter(|w| w.is_healthy())
.cloned()
.collect()
}
}
3. Policy Factory
Create policies based on configuration:
pub struct PolicyFactory;
impl PolicyFactory {
pub fn create(config: &PolicyConfig) -> Result<Arc<dyn RoutingPolicy>, PolicyError> {
match config {
PolicyConfig::Random => Ok(Arc::new(RandomPolicy::new())),
PolicyConfig::RoundRobin => Ok(Arc::new(RoundRobinPolicy::new())),
PolicyConfig::CacheAware { .. } => Ok(Arc::new(CacheAwarePolicy::new(config)?)),
PolicyConfig::PowerOfTwo { .. } => Ok(Arc::new(PowerOfTwoPolicy::new(config)?)),
}
}
}
Implementation Plan
Step 1: Create Policy Module Structure
- Create
src/rout/policies/mod.rs with trait definition
- Create individual policy files for each implementation
- Define common error types and helper functions
Step 2: Implement Random Policy
- Simplest policy to validate the trait design
- Use thread-safe random number generation
- Implement both single and pair selection
Step 3: Implement RoundRobin Policy
- Maintain separate counters for each worker pool
- Ensure atomic counter updates
- Handle wraparound correctly
Step 4: Implement CacheAware Policy
- Port existing cache-aware logic
- Ensure tree management is thread-safe
- Implement load balancing fallback
Step 5: Implement PowerOfTwo Policy
- Make it work for regular routing (not just PD)
- Sample two workers and pick the least loaded
- Add proper metrics tracking
Step 6: Update Routers
- Modify routers to use RoutingPolicy trait
- Remove duplicated policy logic
- Ensure backward compatibility
Benefits
- Code Reuse: All policies work in both routing modes
- Extensibility: New policies can be added without touching routers
- Consistency: Single implementation for each algorithm
- Testing: Policies can be tested in isolation
- Feature Parity: PowerOfTwo becomes available for regular routing
Example Implementation
// Random Policy
pub struct RandomPolicy {
rng: Mutex<rand::rngs::ThreadRng>,
}
#[async_trait]
impl RoutingPolicy for RandomPolicy {
async fn select_single(
&self,
workers: &[Arc<dyn Worker>],
_request: &serde_json::Value,
) -> Result<Arc<dyn Worker>, RoutingError> {
let healthy: Vec<_> = workers.iter()
.filter(|w| w.is_healthy())
.collect();
if healthy.is_empty() {
return Err(RoutingError::NoHealthyWorkers);
}
let idx = self.rng.lock().unwrap().gen_range(0..healthy.len());
Ok(healthy[idx].clone())
}
async fn select_pair(
&self,
prefill: &[Arc<dyn Worker>],
decode: &[Arc<dyn Worker>],
request: &serde_json::Value,
) -> Result<(Arc<dyn Worker>, Arc<dyn Worker>), RoutingError> {
let p = self.select_single(prefill, request).await?;
let d = self.select_single(decode, request).await?;
Ok((p, d))
}
fn on_request_complete(&self, _: &str, _: bool) {}
fn name(&self) -> &'static str { "random" }
}
Testing Plan
-
Unit Tests:
- Test each policy in isolation
- Verify distribution properties
- Test error handling
-
Integration Tests:
- Test policy switching
- Verify metrics are recorded
- Test with both routing modes
-
Load Tests:
- Ensure policies perform under load
- Verify fair distribution
- Check for memory leaks
Acceptance Criteria
Estimated Effort
- Design and trait definition: 1 day
- Policy implementations: 3 days
- Router integration: 2 days
- Testing: 1 day
- Total: 7 days
Dependencies
- Task 001: Worker Abstraction (policies operate on Workers)
Risks and Mitigations
-
Risk: Policy behavior changes during refactoring
- Mitigation: Comprehensive tests before refactoring
- Mitigation: A/B testing in staging environment
-
Risk: Performance overhead from trait dispatch
- Mitigation: Benchmark each policy implementation
- Mitigation: Use static dispatch where possible
-
Risk: Thread safety issues
- Mitigation: Careful use of synchronization primitives
- Mitigation: Stress test under high concurrency
Task 002: Introduce RoutingPolicy Trait
Summary
Create a unified RoutingPolicy trait that enables all routing algorithms (Random, RoundRobin, CacheAware, PowerOfTwo) to work seamlessly in both regular and PD routing modes, eliminating code duplication.
Problem Statement
The current routing implementation has several issues:
Proposed Solution
1. RoutingPolicy Trait
Define a trait that all routing policies must implement:
2. Common Policy Behaviors
Extract common functionality into helper traits:
3. Policy Factory
Create policies based on configuration:
Implementation Plan
Step 1: Create Policy Module Structure
src/rout/policies/mod.rswith trait definitionStep 2: Implement Random Policy
Step 3: Implement RoundRobin Policy
Step 4: Implement CacheAware Policy
Step 5: Implement PowerOfTwo Policy
Step 6: Update Routers
Benefits
Example Implementation
Testing Plan
Unit Tests:
Integration Tests:
Load Tests:
Acceptance Criteria
Estimated Effort
Dependencies
Risks and Mitigations
Risk: Policy behavior changes during refactoring
Risk: Performance overhead from trait dispatch
Risk: Thread safety issues