Skip to content

Task 004: Port Observability Module #7537

@slin1237

Description

@slin1237

Task 004: Port Observability Module

Summary

Port the centralized observability module from existing implementation to consolidate metrics collection and improve monitoring capabilities across the router.

Motivation

Currently:

  • Metrics are scattered throughout the codebase
  • No consistent metric naming convention
  • Difficult to add new metrics
  • No centralized configuration for observability

Implementation Plan

1. Create Observability Module Structure

// src/infrastructure/observability/mod.rs
pub mod metrics;
pub mod logging;

use metrics_exporter_prometheus::PrometheusBuilder;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

pub struct ObservabilityConfig {
    pub metrics: MetricsConfig,
    pub logging: LoggingConfig,
}

pub struct MetricsConfig {
    pub enabled: bool,
    pub port: u16,
    pub host: String,
    pub buckets: Vec<f64>,
}

pub fn init_observability(config: ObservabilityConfig) -> Result<(), ObservabilityError> {
    // Initialize logging
    init_logging(&config.logging)?;
    
    // Initialize metrics
    if config.metrics.enabled {
        init_metrics(&config.metrics)?;
    }
    
    Ok(())
}

2. Centralize Metrics Definition

// src/infrastructure/observability/metrics.rs
use metrics::{counter, gauge, histogram, Unit};
use std::time::Duration;

pub struct RouterMetrics;

impl RouterMetrics {
    // Request metrics
    pub fn record_request(route: &str, method: &str) {
        counter!("sgl_router_requests_total", 
            "route" => route.to_string(), 
            "method" => method.to_string()
        ).increment(1);
    }
    
    pub fn record_request_duration(route: &str, duration: Duration) {
        histogram!("sgl_router_request_duration_seconds",
            "route" => route.to_string()
        ).record(duration.as_secs_f64());
    }
    
    pub fn record_request_error(route: &str, error_type: &str) {
        counter!("sgl_router_errors_total",
            "route" => route.to_string(),
            "error_type" => error_type.to_string()
        ).increment(1);
    }
    
    // Worker metrics
    pub fn set_worker_health(worker_url: &str, healthy: bool) {
        gauge!("sgl_router_worker_healthy",
            "worker" => worker_url.to_string()
        ).set(if healthy { 1.0 } else { 0.0 });
    }
    
    pub fn set_worker_load(worker_url: &str, load: usize) {
        gauge!("sgl_router_worker_load",
            "worker" => worker_url.to_string()
        ).set(load as f64);
    }
    
    // Policy metrics
    pub fn record_policy_decision(policy: &str, worker: &str) {
        counter!("sgl_router_policy_decisions_total",
            "policy" => policy.to_string(),
            "worker" => worker.to_string()
        ).increment(1);
    }
    
    // Cache metrics
    pub fn record_cache_hit(worker: &str) {
        counter!("sgl_router_cache_hits_total",
            "worker" => worker.to_string()
        ).increment(1);
    }
    
    pub fn record_cache_miss(worker: &str) {
        counter!("sgl_router_cache_misses_total",
            "worker" => worker.to_string()
        ).increment(1);
    }
    
    pub fn set_tree_size(worker: &str, size: usize) {
        gauge!("sgl_router_tree_size",
            "worker" => worker.to_string()
        ).set(size as f64);
    }
    
    // Service discovery metrics
    pub fn record_discovery_update(added: usize, removed: usize) {
        counter!("sgl_router_discovery_updates_total").increment(1);
        gauge!("sgl_router_discovery_workers_added").set(added as f64);
        gauge!("sgl_router_discovery_workers_removed").set(removed as f64);
    }
}

3. Create Metric Middleware

// src/server/middleware.rs
use actix_web::{dev::ServiceRequest, Error};
use std::time::Instant;

pub async fn metrics_middleware(
    req: ServiceRequest,
    srv: &mut dyn actix_web::dev::Service<ServiceRequest>,
) -> Result<actix_web::dev::ServiceResponse, Error> {
    let start = Instant::now();
    let route = req.path().to_string();
    let method = req.method().to_string();
    
    RouterMetrics::record_request(&route, &method);
    
    let res = srv.call(req).await;
    
    let duration = start.elapsed();
    RouterMetrics::record_request_duration(&route, duration);
    
    if let Err(ref e) = res {
        RouterMetrics::record_request_error(&route, "internal_error");
    }
    
    res
}

4. Update Existing Code to Use Centralized Metrics

// In routing policies
impl RoutingPolicy for CacheAwarePolicy {
    async fn select_single(&self, workers: &[Arc<dyn Worker>], request: &serde_json::Value) 
        -> Result<Arc<dyn Worker>, RoutingError> {
        let result = self.internal_select(workers, request).await?;
        
        // Record metrics
        RouterMetrics::record_policy_decision(self.name(), result.url());
        if self.was_cache_hit {
            RouterMetrics::record_cache_hit(result.url());
        } else {
            RouterMetrics::record_cache_miss(result.url());
        }
        
        Ok(result)
    }
}

// In worker health checking
impl Worker for WorkerImpl {
    async fn check_health(&self) -> Result<(), WorkerError> {
        let result = self.internal_health_check().await;
        let healthy = result.is_ok();
        
        self.healthy.store(healthy, Ordering::Relaxed);
        RouterMetrics::set_worker_health(self.url(), healthy);
        
        result
    }
}

5. Add Grafana Dashboard Definition

// dashboards/sgl-router.json
{
  "dashboard": {
    "title": "SGLang Router Metrics",
    "panels": [
      {
        "title": "Request Rate",
        "targets": [{
          "expr": "rate(sgl_router_requests_total[5m])"
        }]
      },
      {
        "title": "Request Duration P99",
        "targets": [{
          "expr": "histogram_quantile(0.99, rate(sgl_router_request_duration_seconds_bucket[5m]))"
        }]
      },
      {
        "title": "Worker Health",
        "targets": [{
          "expr": "sgl_router_worker_healthy"
        }]
      },
      {
        "title": "Cache Hit Rate",
        "targets": [{
          "expr": "rate(sgl_router_cache_hits_total[5m]) / (rate(sgl_router_cache_hits_total[5m]) + rate(sgl_router_cache_misses_total[5m]))"
        }]
      }
    ]
  }
}

Acceptance Criteria

  1. Module Structure

    • Observability module created
    • Metrics and logging submodules
    • Configuration types defined
  2. Metrics Centralization

    • All metrics defined in one place
    • Consistent naming convention
    • Proper labels and units
  3. Integration

    • Existing metrics migrated
    • Middleware integrated
    • No metrics regression
  4. Documentation

    • Metrics documented
    • Grafana dashboard provided
    • Usage examples
  5. Testing

    • Metrics endpoint tested
    • Correct metric values verified
    • Performance impact measured

Dependencies

  • Task 001: Worker Abstraction (for worker metrics)
  • Task 002: RoutingPolicy Trait (for policy metrics)

Estimated Effort

  • Implementation: 2 days
  • Migration: 1 day
  • Testing: 1 day
  • Total: 4 days

Risks

  • Risk: Missing metrics during migration
    • Mitigation: Audit all current metrics before migration
  • Risk: Performance impact
    • Mitigation: Use atomic operations, benchmark impact

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions