Create a protocol-based abstraction for observability:
# plugins/framework/observability.py
from typing import Protocol, Optional
from contextvars import ContextVar
current_trace_id: ContextVar[Optional[str]] = ContextVar("current_trace_id", default=None)
class ObservabilityProvider(Protocol):
"""Interface for observability - host application implements this."""
def start_trace(self, name: str, **kwargs) -> Optional[str]: ...
def end_trace(self, trace_id: str, **kwargs) -> None: ...
class NullObservability:
"""Default no-op implementation for standalone operation."""
def start_trace(self, name: str, **kwargs) -> None:
return None
def end_trace(self, trace_id: str, **kwargs) -> None:
pass
Tasks:
Create a protocol-based abstraction for observability:
Tasks:
plugins/framework/observability.pywithObservabilityProviderprotocolNullObservabilityas defaultPluginManagerto accept optionalobservabilityparametermcpgateway.db.SessionLocalimport