[MP][Observability][1/3] EventBus core infrastructure + OpenTelemetry dependency#2792
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request lays the foundational infrastructure for a significant refactor of the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
…dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
6d59f19 to
f540814
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces the core infrastructure for a new unified observability system based on an EventBus and OpenTelemetry. The changes include a detailed design document, the EventBus implementation, new event and configuration models, OpenTelemetry initialization helpers, and comprehensive unit tests. The code is well-structured and the design is solid. I've identified a race condition in the EventBus.publish method that should be addressed. I also have a couple of suggestions for improving the design document's consistency and making the tests more robust.
| if len(self._queue) >= self._config.max_queue_size: | ||
| self._discard_count += 1 | ||
| now = time.monotonic() | ||
| if now - self._last_discard_warning >= 1.0: | ||
| logger.warning( | ||
| "EventBus queue full (max_queue_size=%d), " | ||
| "%d event(s) discarded so far", | ||
| self._config.max_queue_size, | ||
| self._discard_count, | ||
| ) | ||
| self._last_discard_warning = now | ||
| return | ||
|
|
||
| event.timestamp = time.time() | ||
| self._queue.append(event) | ||
| self._wake.set() |
There was a problem hiding this comment.
The publish method has a race condition when called from multiple producer threads. The check for len(self._queue) and the subsequent self._queue.append(event) are not atomic. This can lead to the queue size temporarily exceeding max_queue_size. Additionally, _discard_count and _last_discard_warning are also accessed without a lock, which is not thread-safe.
To ensure thread safety and strictly enforce the queue limit, this entire block should be protected by self._lock.
| if len(self._queue) >= self._config.max_queue_size: | |
| self._discard_count += 1 | |
| now = time.monotonic() | |
| if now - self._last_discard_warning >= 1.0: | |
| logger.warning( | |
| "EventBus queue full (max_queue_size=%d), " | |
| "%d event(s) discarded so far", | |
| self._config.max_queue_size, | |
| self._discard_count, | |
| ) | |
| self._last_discard_warning = now | |
| return | |
| event.timestamp = time.time() | |
| self._queue.append(event) | |
| self._wake.set() | |
| with self._lock: | |
| if len(self._queue) >= self._config.max_queue_size: | |
| self._discard_count += 1 | |
| now = time.monotonic() | |
| if now - self._last_discard_warning >= 1.0: | |
| logger.warning( | |
| "EventBus queue full (max_queue_size=%d), " | |
| "%d event(s) discarded so far", | |
| self._config.max_queue_size, | |
| self._discard_count, | |
| ) | |
| self._last_discard_warning = now | |
| return | |
| event.timestamp = time.time() | |
| self._queue.append(event) | |
| self._wake.set() |
| class EventBus: | ||
| def __init__(self, config: EventBusConfig): | ||
| self._subscribers: dict[EventType, list[EventCallback]] = defaultdict(list) | ||
| self._queue: collections.deque[Event] = collections.deque(maxlen=config.max_queue_size) |
There was a problem hiding this comment.
There's an inconsistency regarding the queue overflow policy. This example shows collections.deque(maxlen=config.max_queue_size), which implies a "head-drop" policy (discarding the oldest events). However, the implementation in event_bus.py and Open Question #4 in this document discuss a "tail-drop" policy (discarding the newest events), which is what's actually implemented. To avoid confusion, the example code should be consistent with the chosen implementation.
| self._queue: collections.deque[Event] = collections.deque(maxlen=config.max_queue_size) | |
| self._queue: collections.deque[Event] = collections.deque() |
| bus.start() | ||
|
|
||
| bus.publish(_make_event()) | ||
| time.sleep(0.15) |
There was a problem hiding this comment.
Using time.sleep() in tests can lead to flakiness or unnecessary delays. A more robust approach for synchronizing with the background thread is to use a threading.Event.
For example, you could modify _RecordingSubscriber to signal an event when it receives a message, and have the test wait on that event:
# In _RecordingSubscriber
class _RecordingSubscriber(EventSubscriber):
def __init__(self, ...):
# ...
self.processed_event = threading.Event()
def _on_event(self, event: Event) -> None:
self.events.append(event)
self.processed_event.set()
# In test_event_reaches_subscriber
def test_event_reaches_subscriber(self, bus):
# ...
bus.publish(_make_event())
assert sub.processed_event.wait(timeout=1), "Event not processed in time"
bus.stop()
# ...This makes the test faster and more reliable.
| self._discard_count += 1 | ||
| now = time.monotonic() | ||
| if now - self._last_discard_warning >= 1.0: | ||
| logger.warning( |
There was a problem hiding this comment.
nit: ideally we can consider finding a way to prevent repetitively printing the message here.
KuntaiDu
left a comment
There was a problem hiding this comment.
Please move the design doc. Otherwise LGTM.
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com> Signed-off-by: Aaron Wu <aaron.wu@dell.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
… dependency (LMCache#2792) [MP][Observability] Add EventBus core infrastructure + OpenTelemetry dependency Introduce the unified EventBus pub/sub system for mp_observability alongside the OpenTelemetry SDK dependency. This is PR 1 of the observability refactor that will unify the Listener/Logger and Telemetry systems into a single event-driven architecture. New files: - event.py: EventType enum (16 event types) + Event dataclass - event_bus.py: EventBus (async queue + drain thread), EventSubscriber ABC, EventBusConfig, global singleton management - otel_init.py: OTel MeterProvider + TracerProvider init helpers - config.py: ObservabilityConfig added alongside existing PrometheusConfig - test_event_bus.py: 26 unit tests - REFACT_DESIGN.md: Full design doc for the observability refactor Dependencies added: opentelemetry-api, opentelemetry-sdk, opentelemetry-exporter-prometheus No existing behavior is changed. Signed-off-by: royyhuang <roy.y.huang@gmail.com>
Summary
event.py,event_bus.py) formp_observabilitythat will replace both the Listener/Logger and Telemetry systemsopentelemetry-api,opentelemetry-sdk,opentelemetry-exporter-prometheus) and OTel init helpersObservabilityConfigalongside existingPrometheusConfigREFACT_DESIGN.mdwith full design doc for the observability refactorThis is PR 1 of 3 in the observability refactor. No existing behavior is changed — pure additions.
See
REFACT_DESIGN.mdfor the full design, implementation plan, and design tradeoff discussion.Test plan
test_event_bus.pyall pass