Skip to content

Task 002: Introduce RoutingPolicy Trait #7535

@slin1237

Description

@slin1237

Task 002: Introduce RoutingPolicy Trait

Summary

Create a unified RoutingPolicy trait that enables all routing algorithms (Random, RoundRobin, CacheAware, PowerOfTwo) to work seamlessly in both regular and PD routing modes, eliminating code duplication.

Problem Statement

The current routing implementation has several issues:

  • Routing policies are duplicated between regular and PD routers
  • PowerOfTwo policy only exists in PD mode, but could benefit regular routing
  • CacheAware logic is copy-pasted with slight variations
  • Adding new routing policies requires modifying router internals
  • No clear interface or contract for routing algorithms

Proposed Solution

1. RoutingPolicy Trait

Define a trait that all routing policies must implement:

// src/routing/policies/mod.rs
#[async_trait]
pub trait RoutingPolicy: Send + Sync {
    /// Select a single worker for regular routing
    async fn select_single(
        &self,
        workers: &[Arc<dyn Worker>],
        request: &serde_json::Value,
    ) -> Result<Arc<dyn Worker>, RoutingError>;
    
    /// Select prefill and decode workers for PD routing
    async fn select_pair(
        &self,
        prefill_workers: &[Arc<dyn Worker>],
        decode_workers: &[Arc<dyn Worker>],
        request: &serde_json::Value,
    ) -> Result<(Arc<dyn Worker>, Arc<dyn Worker>), RoutingError>;
    
    /// Notify policy of request completion (for stateful policies)
    fn on_request_complete(&self, worker_url: &str, success: bool);
    
    /// Get policy name for metrics and debugging
    fn name(&self) -> &'static str;
}

2. Common Policy Behaviors

Extract common functionality into helper traits:

pub trait LoadBalancing {
    fn select_least_loaded(&self, workers: &[Arc<dyn Worker>]) -> Option<Arc<dyn Worker>> {
        workers.iter()
            .filter(|w| w.is_healthy())
            .min_by_key(|w| w.load().load(Ordering::Relaxed))
            .cloned()
    }
    
    fn get_healthy_workers(&self, workers: &[Arc<dyn Worker>]) -> Vec<Arc<dyn Worker>> {
        workers.iter()
            .filter(|w| w.is_healthy())
            .cloned()
            .collect()
    }
}

3. Policy Factory

Create policies based on configuration:

pub struct PolicyFactory;

impl PolicyFactory {
    pub fn create(config: &PolicyConfig) -> Result<Arc<dyn RoutingPolicy>, PolicyError> {
        match config {
            PolicyConfig::Random => Ok(Arc::new(RandomPolicy::new())),
            PolicyConfig::RoundRobin => Ok(Arc::new(RoundRobinPolicy::new())),
            PolicyConfig::CacheAware { .. } => Ok(Arc::new(CacheAwarePolicy::new(config)?)),
            PolicyConfig::PowerOfTwo { .. } => Ok(Arc::new(PowerOfTwoPolicy::new(config)?)),
        }
    }
}

Implementation Plan

Step 1: Create Policy Module Structure

  • Create src/rout/policies/mod.rs with trait definition
  • Create individual policy files for each implementation
  • Define common error types and helper functions

Step 2: Implement Random Policy

  • Simplest policy to validate the trait design
  • Use thread-safe random number generation
  • Implement both single and pair selection

Step 3: Implement RoundRobin Policy

  • Maintain separate counters for each worker pool
  • Ensure atomic counter updates
  • Handle wraparound correctly

Step 4: Implement CacheAware Policy

  • Port existing cache-aware logic
  • Ensure tree management is thread-safe
  • Implement load balancing fallback

Step 5: Implement PowerOfTwo Policy

  • Make it work for regular routing (not just PD)
  • Sample two workers and pick the least loaded
  • Add proper metrics tracking

Step 6: Update Routers

  • Modify routers to use RoutingPolicy trait
  • Remove duplicated policy logic
  • Ensure backward compatibility

Benefits

  1. Code Reuse: All policies work in both routing modes
  2. Extensibility: New policies can be added without touching routers
  3. Consistency: Single implementation for each algorithm
  4. Testing: Policies can be tested in isolation
  5. Feature Parity: PowerOfTwo becomes available for regular routing

Example Implementation

// Random Policy
pub struct RandomPolicy {
    rng: Mutex<rand::rngs::ThreadRng>,
}

#[async_trait]
impl RoutingPolicy for RandomPolicy {
    async fn select_single(
        &self,
        workers: &[Arc<dyn Worker>],
        _request: &serde_json::Value,
    ) -> Result<Arc<dyn Worker>, RoutingError> {
        let healthy: Vec<_> = workers.iter()
            .filter(|w| w.is_healthy())
            .collect();
            
        if healthy.is_empty() {
            return Err(RoutingError::NoHealthyWorkers);
        }
        
        let idx = self.rng.lock().unwrap().gen_range(0..healthy.len());
        Ok(healthy[idx].clone())
    }
    
    async fn select_pair(
        &self,
        prefill: &[Arc<dyn Worker>],
        decode: &[Arc<dyn Worker>],
        request: &serde_json::Value,
    ) -> Result<(Arc<dyn Worker>, Arc<dyn Worker>), RoutingError> {
        let p = self.select_single(prefill, request).await?;
        let d = self.select_single(decode, request).await?;
        Ok((p, d))
    }
    
    fn on_request_complete(&self, _: &str, _: bool) {}
    
    fn name(&self) -> &'static str { "random" }
}

Testing Plan

  1. Unit Tests:

    • Test each policy in isolation
    • Verify distribution properties
    • Test error handling
  2. Integration Tests:

    • Test policy switching
    • Verify metrics are recorded
    • Test with both routing modes
  3. Load Tests:

    • Ensure policies perform under load
    • Verify fair distribution
    • Check for memory leaks

Acceptance Criteria

  • RoutingPolicy trait defined and documented
  • All four policies implemented using the trait
  • Policies work correctly in both routing modes
  • No code duplication between policies
  • Router code simplified by removing embedded policy logic
  • All existing tests pass
  • Performance metrics show no regression
  • New policy can be added without modifying routers

Estimated Effort

  • Design and trait definition: 1 day
  • Policy implementations: 3 days
  • Router integration: 2 days
  • Testing: 1 day
  • Total: 7 days

Dependencies

  • Task 001: Worker Abstraction (policies operate on Workers)

Risks and Mitigations

  1. Risk: Policy behavior changes during refactoring

    • Mitigation: Comprehensive tests before refactoring
    • Mitigation: A/B testing in staging environment
  2. Risk: Performance overhead from trait dispatch

    • Mitigation: Benchmark each policy implementation
    • Mitigation: Use static dispatch where possible
  3. Risk: Thread safety issues

    • Mitigation: Careful use of synchronization primitives
    • Mitigation: Stress test under high concurrency

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions