Skip to content

[MP][Observability][1/3] EventBus core infrastructure + OpenTelemetry dependency#2792

Merged
royyhuang merged 2 commits intoLMCache:devfrom
royyhuang:refact/mp_observability
Mar 18, 2026
Merged

[MP][Observability][1/3] EventBus core infrastructure + OpenTelemetry dependency#2792
royyhuang merged 2 commits intoLMCache:devfrom
royyhuang:refact/mp_observability

Conversation

@royyhuang
Copy link
Copy Markdown
Contributor

@royyhuang royyhuang commented Mar 16, 2026

Summary

  • Introduce unified EventBus pub/sub system (event.py, event_bus.py) for mp_observability that will replace both the Listener/Logger and Telemetry systems
  • Add OpenTelemetry SDK dependency (opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus) and OTel init helpers
  • Add ObservabilityConfig alongside existing PrometheusConfig
  • Add REFACT_DESIGN.md with full design doc for the observability refactor
  • 26 unit tests for EventBus (subscription, lifecycle, dispatch, ordering, exception isolation, backpressure, singleton)

This is PR 1 of 3 in the observability refactor. No existing behavior is changed — pure additions.

See REFACT_DESIGN.md for the full design, implementation plan, and design tradeoff discussion.

Test plan

  • 26 unit tests in test_event_bus.py all pass
  • All pre-commit hooks pass (SPDX, isort, ruff, ruff-format, codespell, mypy)
  • Verify no regressions in existing mp_observability tests

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request lays the foundational infrastructure for a significant refactor of the mp_observability module. It introduces a unified event-driven system based on an EventBus and integrates OpenTelemetry as the primary instrumentation API. This change aims to streamline how metrics, traces, and logs are collected and processed, providing a more consistent and vendor-neutral observability framework without altering existing runtime behavior in this initial phase.

Highlights

  • Unified EventBus Infrastructure: Introduced a new EventBus pub/sub system, EventType enum, and Event dataclass to unify observability event handling, replacing existing Listener/Logger and Telemetry systems in a phased migration.
  • OpenTelemetry Integration: Added core OpenTelemetry SDK dependencies (opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus) and helper functions for OTel initialization (init_otel_metrics, init_otel_tracing).
  • Observability Configuration: Added a new ObservabilityConfig dataclass to manage the unified EventBus and OpenTelemetry pipeline settings, including enabling/disabling metrics, logging, and tracing.
  • Design Documentation: Included a detailed design document (REFACT_DESIGN.md) outlining the motivation, proposed architecture, event model, implementation plan (broken into 4 PRs), and design tradeoffs for the observability refactor.
  • Comprehensive Unit Tests: Added 26 unit tests for the EventBus to ensure correct functionality for subscription, lifecycle management, event dispatch, ordering, exception isolation, backpressure, and singleton behavior.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • lmcache/v1/mp_observability/REFACT_DESIGN.md
    • Added a comprehensive design document detailing the new unified EventBus and OpenTelemetry-based observability architecture.
  • lmcache/v1/mp_observability/config.py
    • Updated the module's docstring to reflect the broader observability scope.
    • Added ObservabilityConfig dataclass to configure the new EventBus system, including flags for metrics, logging, tracing, and queue size.
  • lmcache/v1/mp_observability/event.py
    • Added EventType enum to define all observable event types within the MP system.
    • Added Event dataclass to serve as a unified payload for all observability events, including timestamp and metadata.
  • lmcache/v1/mp_observability/event_bus.py
    • Added EventBus class for asynchronous pub/sub event dispatch with queue management and background processing.
    • Added EventBusConfig for configuring the EventBus behavior.
    • Added EventSubscriber abstract base class for defining component-specific event handlers.
    • Implemented singleton management for the EventBus.
  • lmcache/v1/mp_observability/otel_init.py
    • Added init_otel_metrics function to set up the OpenTelemetry MeterProvider with a Prometheus exporter.
    • Added init_otel_tracing function as a placeholder for OpenTelemetry TracerProvider initialization.
  • requirements/common.txt
    • Added opentelemetry-api, opentelemetry-sdk, and opentelemetry-exporter-prometheus as new project dependencies.
  • tests/v1/mp_observability/test_event_bus.py
    • Added extensive unit tests for the EventBus and EventSubscriber classes, covering core functionality and edge cases.
Activity
  • The pull request introduces the core infrastructure for a new observability system, including an EventBus and OpenTelemetry dependencies.
  • The author has provided a detailed design document (REFACT_DESIGN.md) explaining the motivation, proposed architecture, and a 4-PR migration plan.
  • 26 unit tests for the new EventBus functionality have been added and are reported to pass.
  • All pre-commit hooks (SPDX, isort, ruff, ruff-format, codespell, mypy) have passed.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

…dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
@royyhuang royyhuang force-pushed the refact/mp_observability branch from 6d59f19 to f540814 Compare March 16, 2026 22:07
@royyhuang royyhuang changed the title [MP][Observability] EventBus core infrastructure + OpenTelemetry dependency [MP][Observability][1/4] EventBus core infrastructure + OpenTelemetry dependency Mar 16, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the core infrastructure for a new unified observability system based on an EventBus and OpenTelemetry. The changes include a detailed design document, the EventBus implementation, new event and configuration models, OpenTelemetry initialization helpers, and comprehensive unit tests. The code is well-structured and the design is solid. I've identified a race condition in the EventBus.publish method that should be addressed. I also have a couple of suggestions for improving the design document's consistency and making the tests more robust.

Comment on lines +132 to +147
if len(self._queue) >= self._config.max_queue_size:
self._discard_count += 1
now = time.monotonic()
if now - self._last_discard_warning >= 1.0:
logger.warning(
"EventBus queue full (max_queue_size=%d), "
"%d event(s) discarded so far",
self._config.max_queue_size,
self._discard_count,
)
self._last_discard_warning = now
return

event.timestamp = time.time()
self._queue.append(event)
self._wake.set()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The publish method has a race condition when called from multiple producer threads. The check for len(self._queue) and the subsequent self._queue.append(event) are not atomic. This can lead to the queue size temporarily exceeding max_queue_size. Additionally, _discard_count and _last_discard_warning are also accessed without a lock, which is not thread-safe.

To ensure thread safety and strictly enforce the queue limit, this entire block should be protected by self._lock.

Suggested change
if len(self._queue) >= self._config.max_queue_size:
self._discard_count += 1
now = time.monotonic()
if now - self._last_discard_warning >= 1.0:
logger.warning(
"EventBus queue full (max_queue_size=%d), "
"%d event(s) discarded so far",
self._config.max_queue_size,
self._discard_count,
)
self._last_discard_warning = now
return
event.timestamp = time.time()
self._queue.append(event)
self._wake.set()
with self._lock:
if len(self._queue) >= self._config.max_queue_size:
self._discard_count += 1
now = time.monotonic()
if now - self._last_discard_warning >= 1.0:
logger.warning(
"EventBus queue full (max_queue_size=%d), "
"%d event(s) discarded so far",
self._config.max_queue_size,
self._discard_count,
)
self._last_discard_warning = now
return
event.timestamp = time.time()
self._queue.append(event)
self._wake.set()

class EventBus:
def __init__(self, config: EventBusConfig):
self._subscribers: dict[EventType, list[EventCallback]] = defaultdict(list)
self._queue: collections.deque[Event] = collections.deque(maxlen=config.max_queue_size)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There's an inconsistency regarding the queue overflow policy. This example shows collections.deque(maxlen=config.max_queue_size), which implies a "head-drop" policy (discarding the oldest events). However, the implementation in event_bus.py and Open Question #4 in this document discuss a "tail-drop" policy (discarding the newest events), which is what's actually implemented. To avoid confusion, the example code should be consistent with the chosen implementation.

Suggested change
self._queue: collections.deque[Event] = collections.deque(maxlen=config.max_queue_size)
self._queue: collections.deque[Event] = collections.deque()

bus.start()

bus.publish(_make_event())
time.sleep(0.15)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using time.sleep() in tests can lead to flakiness or unnecessary delays. A more robust approach for synchronizing with the background thread is to use a threading.Event.

For example, you could modify _RecordingSubscriber to signal an event when it receives a message, and have the test wait on that event:

# In _RecordingSubscriber
class _RecordingSubscriber(EventSubscriber):
    def __init__(self, ...):
        # ...
        self.processed_event = threading.Event()

    def _on_event(self, event: Event) -> None:
        self.events.append(event)
        self.processed_event.set()

# In test_event_reaches_subscriber
def test_event_reaches_subscriber(self, bus):
    # ...
    bus.publish(_make_event())
    assert sub.processed_event.wait(timeout=1), "Event not processed in time"
    bus.stop()
    # ...

This makes the test faster and more reliable.

@royyhuang royyhuang changed the title [MP][Observability][1/4] EventBus core infrastructure + OpenTelemetry dependency [MP][Observability][1/3] EventBus core infrastructure + OpenTelemetry dependency Mar 17, 2026
@royyhuang royyhuang enabled auto-merge (squash) March 18, 2026 01:11
@github-actions github-actions Bot added the full Run comprehensive tests on this PR label Mar 18, 2026
Comment thread lmcache/v1/mp_observability/REFACT_DESIGN.md
self._discard_count += 1
now = time.monotonic()
if now - self._last_discard_warning >= 1.0:
logger.warning(
Copy link
Copy Markdown
Contributor

@KuntaiDu KuntaiDu Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ideally we can consider finding a way to prevent repetitively printing the message here.

Copy link
Copy Markdown
Contributor

@KuntaiDu KuntaiDu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the design doc. Otherwise LGTM.

Copy link
Copy Markdown
Contributor

@ApostaC ApostaC left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@royyhuang royyhuang merged commit 47d567e into LMCache:dev Mar 18, 2026
26 of 28 checks passed
hyunyul-XCENA pushed a commit to xcena-dev/LMCache that referenced this pull request Mar 20, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
realAaronWu pushed a commit to realAaronWu/LMCache that referenced this pull request Mar 20, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
Signed-off-by: Aaron Wu <aaron.wu@dell.com>
deng451e pushed a commit to deng451e/LMCache that referenced this pull request Mar 21, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
deng451e pushed a commit to deng451e/LMCache that referenced this pull request Mar 25, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
deng451e pushed a commit to deng451e/LMCache that referenced this pull request Mar 27, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
jooho-XCENA pushed a commit to xcena-dev/LMCache that referenced this pull request Apr 2, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
jooho-XCENA pushed a commit to xcena-dev/LMCache that referenced this pull request Apr 2, 2026
… dependency (LMCache#2792)

[MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency

Introduce the unified EventBus pub/sub system for mp_observability alongside
the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor
that will unify the Listener/Logger and Telemetry systems into a single
event-driven architecture.

New files:
- event.py: EventType enum (16 event types) + Event dataclass
- event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC,
  EventBusConfig, global singleton management
- otel_init.py: OTel MeterProvider + TracerProvider init helpers
- config.py: ObservabilityConfig added alongside existing PrometheusConfig
- test_event_bus.py: 26 unit tests
- REFACT_DESIGN.md: Full design doc for the observability refactor

Dependencies added: opentelemetry-api, opentelemetry-sdk,
opentelemetry-exporter-prometheus

No existing behavior is changed.

Signed-off-by: royyhuang <roy.y.huang@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full Run comprehensive tests on this PR

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants