Task 004: Port Observability Module
Summary
Port the centralized observability module from existing implementation to consolidate metrics collection and improve monitoring capabilities across the router.
Motivation
Currently:
- Metrics are scattered throughout the codebase
- No consistent metric naming convention
- Difficult to add new metrics
- No centralized configuration for observability
Implementation Plan
1. Create Observability Module Structure
// src/infrastructure/observability/mod.rs
pub mod metrics;
pub mod logging;
use metrics_exporter_prometheus::PrometheusBuilder;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
pub struct ObservabilityConfig {
pub metrics: MetricsConfig,
pub logging: LoggingConfig,
}
pub struct MetricsConfig {
pub enabled: bool,
pub port: u16,
pub host: String,
pub buckets: Vec<f64>,
}
pub fn init_observability(config: ObservabilityConfig) -> Result<(), ObservabilityError> {
// Initialize logging
init_logging(&config.logging)?;
// Initialize metrics
if config.metrics.enabled {
init_metrics(&config.metrics)?;
}
Ok(())
}
2. Centralize Metrics Definition
// src/infrastructure/observability/metrics.rs
use metrics::{counter, gauge, histogram, Unit};
use std::time::Duration;
pub struct RouterMetrics;
impl RouterMetrics {
// Request metrics
pub fn record_request(route: &str, method: &str) {
counter!("sgl_router_requests_total",
"route" => route.to_string(),
"method" => method.to_string()
).increment(1);
}
pub fn record_request_duration(route: &str, duration: Duration) {
histogram!("sgl_router_request_duration_seconds",
"route" => route.to_string()
).record(duration.as_secs_f64());
}
pub fn record_request_error(route: &str, error_type: &str) {
counter!("sgl_router_errors_total",
"route" => route.to_string(),
"error_type" => error_type.to_string()
).increment(1);
}
// Worker metrics
pub fn set_worker_health(worker_url: &str, healthy: bool) {
gauge!("sgl_router_worker_healthy",
"worker" => worker_url.to_string()
).set(if healthy { 1.0 } else { 0.0 });
}
pub fn set_worker_load(worker_url: &str, load: usize) {
gauge!("sgl_router_worker_load",
"worker" => worker_url.to_string()
).set(load as f64);
}
// Policy metrics
pub fn record_policy_decision(policy: &str, worker: &str) {
counter!("sgl_router_policy_decisions_total",
"policy" => policy.to_string(),
"worker" => worker.to_string()
).increment(1);
}
// Cache metrics
pub fn record_cache_hit(worker: &str) {
counter!("sgl_router_cache_hits_total",
"worker" => worker.to_string()
).increment(1);
}
pub fn record_cache_miss(worker: &str) {
counter!("sgl_router_cache_misses_total",
"worker" => worker.to_string()
).increment(1);
}
pub fn set_tree_size(worker: &str, size: usize) {
gauge!("sgl_router_tree_size",
"worker" => worker.to_string()
).set(size as f64);
}
// Service discovery metrics
pub fn record_discovery_update(added: usize, removed: usize) {
counter!("sgl_router_discovery_updates_total").increment(1);
gauge!("sgl_router_discovery_workers_added").set(added as f64);
gauge!("sgl_router_discovery_workers_removed").set(removed as f64);
}
}
3. Create Metric Middleware
// src/server/middleware.rs
use actix_web::{dev::ServiceRequest, Error};
use std::time::Instant;
pub async fn metrics_middleware(
req: ServiceRequest,
srv: &mut dyn actix_web::dev::Service<ServiceRequest>,
) -> Result<actix_web::dev::ServiceResponse, Error> {
let start = Instant::now();
let route = req.path().to_string();
let method = req.method().to_string();
RouterMetrics::record_request(&route, &method);
let res = srv.call(req).await;
let duration = start.elapsed();
RouterMetrics::record_request_duration(&route, duration);
if let Err(ref e) = res {
RouterMetrics::record_request_error(&route, "internal_error");
}
res
}
4. Update Existing Code to Use Centralized Metrics
// In routing policies
impl RoutingPolicy for CacheAwarePolicy {
async fn select_single(&self, workers: &[Arc<dyn Worker>], request: &serde_json::Value)
-> Result<Arc<dyn Worker>, RoutingError> {
let result = self.internal_select(workers, request).await?;
// Record metrics
RouterMetrics::record_policy_decision(self.name(), result.url());
if self.was_cache_hit {
RouterMetrics::record_cache_hit(result.url());
} else {
RouterMetrics::record_cache_miss(result.url());
}
Ok(result)
}
}
// In worker health checking
impl Worker for WorkerImpl {
async fn check_health(&self) -> Result<(), WorkerError> {
let result = self.internal_health_check().await;
let healthy = result.is_ok();
self.healthy.store(healthy, Ordering::Relaxed);
RouterMetrics::set_worker_health(self.url(), healthy);
result
}
}
5. Add Grafana Dashboard Definition
// dashboards/sgl-router.json
{
"dashboard": {
"title": "SGLang Router Metrics",
"panels": [
{
"title": "Request Rate",
"targets": [{
"expr": "rate(sgl_router_requests_total[5m])"
}]
},
{
"title": "Request Duration P99",
"targets": [{
"expr": "histogram_quantile(0.99, rate(sgl_router_request_duration_seconds_bucket[5m]))"
}]
},
{
"title": "Worker Health",
"targets": [{
"expr": "sgl_router_worker_healthy"
}]
},
{
"title": "Cache Hit Rate",
"targets": [{
"expr": "rate(sgl_router_cache_hits_total[5m]) / (rate(sgl_router_cache_hits_total[5m]) + rate(sgl_router_cache_misses_total[5m]))"
}]
}
]
}
}
Acceptance Criteria
-
Module Structure
-
Metrics Centralization
-
Integration
-
Documentation
-
Testing
Dependencies
- Task 001: Worker Abstraction (for worker metrics)
- Task 002: RoutingPolicy Trait (for policy metrics)
Estimated Effort
- Implementation: 2 days
- Migration: 1 day
- Testing: 1 day
- Total: 4 days
Risks
- Risk: Missing metrics during migration
- Mitigation: Audit all current metrics before migration
- Risk: Performance impact
- Mitigation: Use atomic operations, benchmark impact
Task 004: Port Observability Module
Summary
Port the centralized observability module from existing implementation to consolidate metrics collection and improve monitoring capabilities across the router.
Motivation
Currently:
Implementation Plan
1. Create Observability Module Structure
2. Centralize Metrics Definition
3. Create Metric Middleware
4. Update Existing Code to Use Centralized Metrics
5. Add Grafana Dashboard Definition
Acceptance Criteria
Module Structure
Metrics Centralization
Integration
Documentation
Testing
Dependencies
Estimated Effort
Risks