[MP][Observability][2/3] Migrate L1 + SM to EventBus + OTel, remove old Prometheus pipeline#2794
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the observability infrastructure for LMCache's multiprocess (MP) mode. It transitions the L1Manager and StorageManager components to a modern, event-driven system utilizing an EventBus for pub/sub dispatch and OpenTelemetry for metrics instrumentation. This change streamlines how operational data is collected and exported, providing greater flexibility and adherence to industry standards while removing a legacy Prometheus-specific pipeline. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request is a significant and well-executed refactoring of the observability system, migrating from a custom Prometheus pipeline to a more flexible EventBus and OpenTelemetry-based architecture. The changes are clean, well-documented with a new README.md and a thorough design document, and correctly remove the old, now-redundant components. My feedback focuses on a few areas to further improve maintainability and testing robustness: strengthening the new metrics tests to assert on output, consolidating duplicated initialization logic, and clarifying the use of an internal OpenTelemetry API in the logging subscribers. Overall, this is a high-quality change that modernizes the observability stack.
| def test_read_finished_increments_counter(self, bus, subscriber): | ||
| bus.start() | ||
| bus.publish(_make_event(EventType.L1_READ_FINISHED, _make_keys(5))) | ||
| time.sleep(0.15) | ||
| bus.stop() | ||
| # Verify the counter was called — OTel counters are real objects, | ||
| # we check via the internal measurement | ||
| # (in a real integration test we'd scrape /metrics) |
There was a problem hiding this comment.
This test verifies that publishing an event doesn't cause a crash, but as the comment notes, it doesn't assert that the OTel counter is actually incremented. To make these tests more robust and provide stronger guarantees that metrics are being correctly generated, you could use an InMemoryMetricReader from the OTel SDK. This would allow you to collect the emitted metrics in memory and assert on their values.
| def test_read_prefetched_increments_counters(self, bus, subscriber): | ||
| bus.start() | ||
| bus.publish( | ||
| _make_sm_event( | ||
| EventType.SM_READ_PREFETCHED, | ||
| succeeded=_make_keys(3), | ||
| failed=_make_keys(2), | ||
| ) | ||
| ) | ||
| time.sleep(0.15) | ||
| bus.stop() |
There was a problem hiding this comment.
This test verifies that publishing an event doesn't cause a crash, but it doesn't assert that the OpenTelemetry counters are actually incremented. To make this test more robust, you could use an InMemoryMetricReader from the OTel SDK to collect the emitted metrics and then assert on their values (e.g., that the request counter was incremented by 1, succeeded keys by 3, and failed keys by 2). This would provide stronger guarantees that the metrics are being correctly generated.
| # Initialize EventBus and register observability subscribers | ||
| # First Party | ||
| from lmcache.v1.mp_observability.event_bus import ( | ||
| EventBusConfig, | ||
| init_event_bus, | ||
| ) | ||
| from lmcache.v1.mp_observability.subscribers.metrics.l1 import ( | ||
| L1MetricsSubscriber, | ||
| ) | ||
| from lmcache.v1.mp_observability.subscribers.metrics.sm import ( | ||
| SMMetricsSubscriber, | ||
| ) | ||
|
|
||
| # Set up OTel MeterProvider BEFORE creating subscribers so that | ||
| # module-level get_meter() calls bind to the real provider | ||
| if prometheus_config.enabled: | ||
| prometheus_client.start_http_server(prometheus_config.port) | ||
| logger.info( | ||
| "Prometheus metrics available at http://0.0.0.0:%d/metrics", | ||
| prometheus_config.port, | ||
| ) | ||
| # First Party | ||
| from lmcache.v1.mp_observability.otel_init import init_otel_metrics | ||
|
|
||
| init_otel_metrics(prometheus_port=prometheus_config.port) | ||
|
|
||
| bus = init_event_bus(EventBusConfig(enabled=prometheus_config.enabled)) | ||
| bus.register_subscriber(L1MetricsSubscriber()) | ||
| bus.register_subscriber(SMMetricsSubscriber()) | ||
| bus.start() |
There was a problem hiding this comment.
This block of code for initializing the event bus, OTel, and subscribers is duplicated across server.py, blend_server.py, and blend_server_v2.py. To improve maintainability and reduce code duplication, consider refactoring this logic into a dedicated helper function within the lmcache.v1.mp_observability module. This function could encapsulate the setup process, making the server startup files cleaner and easier to manage.
| try: | ||
| # Third Party | ||
| from opentelemetry.sdk._logs import LoggingHandler | ||
|
|
||
| _otel_handler = LoggingHandler(level=logging.DEBUG) | ||
| logger.addHandler(_otel_handler) | ||
| except ImportError: | ||
| pass |
There was a problem hiding this comment.
This code imports LoggingHandler from opentelemetry.sdk._logs, which is an internal module. Relying on internal modules can be risky as they are not subject to semantic versioning and can change without notice. Since opentelemetry-sdk is now a direct dependency, the try...except ImportError is likely no longer necessary.
If using LoggingHandler is intentional for a lightweight setup, it would be beneficial to remove the try...except and add a comment explaining the rationale for using an internal API.
# Third Party
# NOTE: Using LoggingHandler from an internal OTel module for a lightweight
# bridge to OTel Logs, which is less overhead than the full LoggingInstrumentor.
from opentelemetry.sdk._logs import LoggingHandler
_otel_handler = LoggingHandler(level=logging.DEBUG)
logger.addHandler(_otel_handler)| try: | ||
| # Third Party | ||
| from opentelemetry.sdk._logs import LoggingHandler | ||
|
|
||
| _otel_handler = LoggingHandler(level=logging.DEBUG) | ||
| logger.addHandler(_otel_handler) | ||
| except ImportError: | ||
| pass |
There was a problem hiding this comment.
This code imports LoggingHandler from opentelemetry.sdk._logs, which is an internal module. Relying on internal modules can be risky as they are not subject to semantic versioning and can change without notice. Since opentelemetry-sdk is now a direct dependency, the try...except ImportError is likely no longer necessary.
If using LoggingHandler is intentional for a lightweight setup, it would be beneficial to remove the try...except and add a comment explaining the rationale for using an internal API.
# Third Party
# NOTE: Using LoggingHandler from an internal OTel module for a lightweight
# bridge to OTel Logs, which is less overhead than the full LoggingInstrumentor.
from opentelemetry.sdk._logs import LoggingHandler
_otel_handler = LoggingHandler(level=logging.DEBUG)
logger.addHandler(_otel_handler)…e old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Key changes: - L1Manager publishes events directly to EventBus alongside listener iteration (listeners stay for business-logic consumers: StoreListener, EvictionPolicy) - StorageManager fully migrated: listener iteration replaced with bus.publish() - New OTel metrics subscribers (L1MetricsSubscriber, SMMetricsSubscriber) - New logging subscribers with OTel LoggingHandler bridge - Dual metrics export: OTLP push (OTEL_EXPORTER_OTLP_ENDPOINT) or Prometheus pull fallback (default, no collector needed) - Removed: PrometheusController, stats dataclasses, old stats loggers, old tests - Updated: METRICS.md, README.md (replaces LOGGER_GUIDE.md), REFACT_DESIGN.md Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…metry subdirs Reorganize subscribers for cleaner project layout: subscribers/metrics/l1.py, sm.py subscribers/logging/l1.py, sm.py subscribers/telemetry/ (future PR 3) Update all imports in server, blend_server, blend_server_v2, and tests. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
36da9c7 to
f3232a8
Compare
Signed-off-by: royyhuang <roy.y.huang@gmail.com>
| # First Party | ||
| from lmcache.v1.mp_observability.otel_init import init_otel_metrics |
There was a problem hiding this comment.
nit: we can do the import at the top of the file
| # First Party | ||
| from lmcache.v1.mp_observability.otel_init import init_otel_metrics |
There was a problem hiding this comment.
Same as above: we can import at the top
| # First Party | ||
| from lmcache.v1.mp_observability.otel_init import init_otel_metrics |
There was a problem hiding this comment.
nit: Same import issue here
There was a problem hiding this comment.
We need __init__.py in the subfolders
| ) | ||
| self._event_bus.publish( | ||
| Event( | ||
| event_type=EventType.L1_KEYS_EVICTED, |
There was a problem hiding this comment.
Can we have L1_KEYS_DELETED instead of L1_KEYS_EVICTED?
The main reason is that not all the deletion is eviction.
There was a problem hiding this comment.
We probably need to rethink what's the best definition of "failed keys". Let's take this offline (the current PR is okay)
| self._event_bus.publish( | ||
| Event( | ||
| event_type=EventType.SM_READ_PREFETCHED_FINISHED, | ||
| metadata={ | ||
| "succeeded_keys": good_keys, | ||
| "failed_keys": bad_keys, | ||
| }, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
The semantics are a bit weird here. We emit the same SM_READ_PREFETCHED_FINISHED event in the finish_read_prefetched function below. Ideally here we need a different event
There was a problem hiding this comment.
We probably need to discuss how to define the "failures" and how to correctly capture them
Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…lity_pr2' into refact/mp_observability_pr2
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com> Signed-off-by: Aaron Wu <aaron.wu@dell.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
…ld Prometheus pipeline (LMCache#2794) * [MP][Observability] Migrate L1 + SM metrics to EventBus + OTel, remove old Prometheus pipeline Replace the old Listener/Stats/PrometheusController observability system with the EventBus + OpenTelemetry pipeline for MP mode. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
Summary
bus.publish()L1MetricsSubscriber,SMMetricsSubscriber) and logging subscribers with OTelLoggingHandlerbridgeOTEL_EXPORTER_OTLP_ENDPOINT) or Prometheus pull fallback (default)PrometheusController, stats dataclasses, stats loggersmetrics/,logging/,telemetry/subdirsREADME.md(replacesLOGGER_GUIDE.md),METRICS.md,REFACT_DESIGN.mdDepends on #2792. This is PR 2 of 3.
Test plan
uv run python -m pytest tests/v1/mp_observability/ --noconftest)curl localhost:9090/metrics)