-
Notifications
You must be signed in to change notification settings - Fork 868
fix(openai): propagate span IDs properly to events #3243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds explicit OpenTelemetry span context management across OpenAI wrappers, moves streaming handling inside active span contexts, hardens ChatStream lifecycle with context/iterator hooks and thread-safe cleanup, and adds tests verifying emitted events carry span/trace IDs. Changes
Sequence Diagram(s)sequenceDiagram
participant App
participant Wrapper as OpenAI Wrapper
participant OTel as OTel Tracer
participant OpenAI as OpenAI SDK
App->>Wrapper: invoke()
Wrapper->>OTel: start_span()
Wrapper->>OTel: use_span(span, end_on_exit=false)
Wrapper->>OpenAI: request (within active span)
OpenAI-->>Wrapper: response / streaming handle
Wrapper->>OTel: record attrs/events/errors
Wrapper-->>App: response or ChatStream
Note right of ChatStream: ChatStream operations keep span active until cleanup/end
sequenceDiagram
participant App
participant Wrapper
participant ChatStream
participant OTel
participant OpenAI
App->>Wrapper: invoke(stream=True)
Wrapper->>OTel: start_span + use_span
Wrapper->>OpenAI: start streaming
Wrapper-->>App: return ChatStream (span-linked)
loop consumer reads
App->>ChatStream: next()/anext()
ChatStream->>OpenAI: read chunk
OpenAI-->>ChatStream: chunk
ChatStream->>OTel: record partial metrics/events
end
ChatStream->>OTel: finalize metrics, set status, end span
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed everything up to 814e6f1 in 1 minute and 26 seconds. Click for details.
- Reviewed
739lines of code in3files - Skipped
0files when reviewing. - Skipped posting
3draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py:92
- Draft comment:
Good use of 'with trace.use_span(span, end_on_exit=False)' to ensure the span is set in the current context so events pick up the proper span IDs. The error and streaming handling appear consistent. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
2. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py:59
- Draft comment:
The completion wrappers now wrap the processing inside a 'with trace.use_span(span, end_on_exit=False)' block. This ensures that events emitted via _handle_request/_handle_response have access to the current span context. Looks good. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
3. packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py:312
- Draft comment:
In runs_create_and_stream_wrapper, the span is only ended in the exception branch. Please verify that for successful streaming responses the span is ended appropriately (perhaps via the event handler) to avoid leaving spans open. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
Workflow ID: wflow_KcNOadfbkwjyKpvt
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (5)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py (1)
264-273: Defensive iteration over choices to avoid NoneType errorsIf an SDK chunk lacks 'choices', iterating over None raises. Guard with default empty list.
- for choice in item.get("choices"): + for choice in (item.get("choices") or []):packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (4)
642-645: Async context manager does not trigger cleanup; spans may leak when usingasync withaexit does not call _ensure_cleanup(); spans may remain open and metrics unfinalized for async context usage.
- async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb) + async def __aexit__(self, exc_type, exc_val, exc_tb): + cleanup_exception = None + try: + self._ensure_cleanup() + except Exception as e: + cleanup_exception = e + logger.debug("Error during ChatStream cleanup in __aexit__: %s", cleanup_exception) + await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)
707-752: Keep span current during streaming finalization to ensure events carry span_idemit_event in _process_complete_response runs after the outer with trace.use_span has exited. Wrap the body in a use_span context.
def _process_complete_response(self): - _set_streaming_token_metrics( + with trace.use_span(self._span, end_on_exit=False): + _set_streaming_token_metrics( self._request_kwargs, self._complete_response, self._span, self._token_counter, self._shared_attributes(), ) # choice metrics if self._choice_counter and self._complete_response.get("choices"): _set_choice_counter_metrics( self._choice_counter, self._complete_response.get("choices"), self._shared_attributes(), ) # duration metrics if self._start_time and isinstance(self._start_time, (float, int)): duration = time.time() - self._start_time else: duration = None if duration and isinstance(duration, (float, int)) and self._duration_histogram: self._duration_histogram.record( duration, attributes=self._shared_attributes() ) if self._streaming_time_to_generate and self._time_of_first_token: self._streaming_time_to_generate.record( time.time() - self._time_of_first_token, attributes=self._shared_attributes(), ) _set_response_attributes(self._span, self._complete_response) if should_emit_events(): for choice in self._complete_response.get("choices", []): emit_event(_parse_choice_event(choice)) else: if should_send_prompts(): _set_completions( self._span, self._complete_response.get("choices")) self._span.set_status(Status(StatusCode.OK)) self._span.end() self._cleanup_completed = True
828-896: Wrap streaming builders intrace.use_span
The streaming builder functions inchat_wrappers.pyemit events and record metrics outside of the span’s active context. Enclose their entire processing—from initializingcomplete_responsethroughspan.end()—in awith trace.use_span(span, end_on_exit=False):block to ensure that all child events and metrics are correctly attributed.• File: packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py
- Lines 828–896:
_build_from_streaming_response- Lines 899–967:
_abuild_from_streaming_responseExample diff for
_build_from_streaming_response:def _build_from_streaming_response( span, response, instance=None, token_counter=None, choice_counter=None, duration_histogram=None, streaming_time_to_first_token=None, streaming_time_to_generate=None, start_time=None, request_kwargs=None, ): - complete_response = {"choices": [], "model": "", "id": ""} + with trace.use_span(span, end_on_exit=False): + complete_response = {"choices": [], "model": "", "id": ""} first_token = True time_of_first_token = start_time # will be updated when first token is received for item in response: @@ - _set_response_attributes(span, complete_response) + _set_response_attributes(span, complete_response) @@ - span.set_status(Status(StatusCode.OK)) - span.end() + span.set_status(Status(StatusCode.OK)) + span.end()Apply the same pattern to
_abuild_from_streaming_response. Ensure you havefrom opentelemetry import traceimported at the top of the file.
651-663: Enhance exception recording in chat stream wrappers
Ensure that on streaming errors (non-StopIteration/StopAsyncIteration), the span records both the exception and its type before setting the error status.Locations to update:
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py, in both__next__(lines ~651–663) and__anext__(lines ~667–681).Suggested diff:
--- a/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py +++ b/packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py @@ def __next__(self): - self._ensure_cleanup() - if self._span and self._span.is_recording(): - self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._ensure_cleanup() + if self._span and self._span.is_recording(): + # Record exception type and details for better diagnostics + self._span.set_attribute(ERROR_TYPE, e.__class__.__name__) + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) @@ async def __anext__(self): - self._ensure_cleanup() - if self._span and self._span.is_recording(): - self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._ensure_cleanup() + if self._span and self._span.is_recording(): + # Record exception type and details for better diagnostics + self._span.set_attribute(ERROR_TYPE, e.__class__.__name__) + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e)))These changes align with other OpenAI wrappers (e.g.,
completion_wrappers.py) and leverage the existingERROR_TYPEimport.
🧹 Nitpick comments (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py (2)
76-79: Consider setting OK status for non-streaming path before endingFor consistency with streaming path, you may want to set span.set_status(Status(StatusCode.OK)) before span.end() when not streaming.
- _handle_response(response, span, instance) - span.end() + _handle_response(response, span, instance) + span.set_status(Status(StatusCode.OK)) + span.end()Also applies to: 112-116
132-139: Optional: include role in prompt events for consistencyCompletion prompts are emitted without a role. If helpful for consumers, emit with a "user" role for prompt strings.
- emit_event(MessageEvent(content=p)) + emit_event(MessageEvent(content=p, role="user")) ... - emit_event(MessageEvent(content=prompt)) + emit_event(MessageEvent(content=prompt, role="user"))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py(3 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (3)
_handle_request(267-288)_build_from_streaming_response(828-895)_abuild_from_streaming_response(899-966)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (1)
is_streaming_response(331-339)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (2)
run_async(154-165)is_openai_v1(20-21)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (1)
is_streaming_response(331-339)
🔇 Additional comments (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py (1)
59-71: Good: keep span current + robust error handlingUsing trace.use_span with end_on_exit=False correctly makes the span current during request/response and streaming setup; exception path records ERROR_TYPE, status, and ends the span. This aligns with the PR objective.
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (2)
132-140: Good: span made current around message processingWrapping messages_list_wrapper logic in trace.use_span ensures all emit_event calls and span attribute mutations have the correct current span.
317-326: Good: error path ends span and re-raisesException logging and span finalization are correct here; the span is ended and the error is propagated.
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py (1)
119-163: Good: use_span around request/response, metrics, and error handlingBoth sync and async wrappers correctly keep span current and finalize on success/error. This addresses the core PR issue for non-streaming flows.
Also applies to: 190-218
| # Use the span as current context to ensure events get proper trace context | ||
| with trace.use_span(span, end_on_exit=False): | ||
| run_async(_handle_request(span, kwargs, instance)) | ||
| try: | ||
| start_time = time.time() | ||
| response = wrapped(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Context not propagated into new thread used by run_async; emit_event may still miss span_id
run_async executes the coroutine in a new thread when a loop is running. The current runtime context (and thus the current span set by trace.use_span) is not automatically propagated to that thread. emit_event inside _handle_request will then lack a current span.
Fix either by propagating the context into the thread here, or by updating utils.run_async to attach a captured context.
Minimal in-place fix:
- run_async(_handle_request(span, kwargs, instance))
+ # Ensure current runtime context (with the span) flows into the helper thread
+ current_ctx = context_api.get_current()
+ def _runner():
+ token = context_api.attach(current_ctx)
+ try:
+ import asyncio
+ asyncio.run(_handle_request(span, kwargs, instance))
+ finally:
+ context_api.detach(token)
+ import threading
+ t = threading.Thread(target=_runner)
+ t.start()
+ t.join()Alternatively, enhance utils.run_async to accept and attach a context (preferred, single place to maintain).
Committable suggestion skipped: line range outside the PR's diff.
| if is_streaming_response(response): | ||
| # span will be closed after the generator is done | ||
| return _build_from_streaming_response(span, kwargs, response) | ||
| else: | ||
| _handle_response(response, span, instance) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Streaming path loses current-context for downstream events; wrap generator bodies with trace.use_span
When you return the streaming generator, the with trace.use_span context is exited. emit_event calls inside _build_from_streaming_response/_abuild_from_streaming_response will then run without a current span, reintroducing the missing span_id problem for streaming events.
Wrap the generator bodies with trace.use_span(span, end_on_exit=False) to keep the span current throughout iteration and finalization.
Apply:
def _build_from_streaming_response(span, request_kwargs, response):
- complete_response = {"choices": [], "model": "", "id": ""}
- for item in response:
- yield item
- _accumulate_streaming_response(complete_response, item)
- _set_response_attributes(span, complete_response)
- _set_token_usage(span, request_kwargs, complete_response)
- if should_emit_events():
- _emit_streaming_response_events(complete_response)
- else:
- if should_send_prompts():
- _set_completions(span, complete_response.get("choices"))
- span.set_status(Status(StatusCode.OK))
- span.end()
+ # Keep span as current during the whole streaming lifecycle
+ with trace.use_span(span, end_on_exit=False):
+ complete_response = {"choices": [], "model": "", "id": ""}
+ for item in response:
+ yield item
+ _accumulate_streaming_response(complete_response, item)
+ _set_response_attributes(span, complete_response)
+ _set_token_usage(span, request_kwargs, complete_response)
+ if should_emit_events():
+ _emit_streaming_response_events(complete_response)
+ else:
+ if should_send_prompts():
+ _set_completions(span, complete_response.get("choices"))
+ span.set_status(Status(StatusCode.OK))
+ span.end()
async def _abuild_from_streaming_response(span, request_kwargs, response):
- complete_response = {"choices": [], "model": "", "id": ""}
- async for item in response:
- yield item
- _accumulate_streaming_response(complete_response, item)
- _set_response_attributes(span, complete_response)
- _set_token_usage(span, request_kwargs, complete_response)
- if should_emit_events():
- _emit_streaming_response_events(complete_response)
- else:
- if should_send_prompts():
- _set_completions(span, complete_response.get("choices"))
- span.set_status(Status(StatusCode.OK))
- span.end()
+ # Keep span as current during the whole streaming lifecycle
+ with trace.use_span(span, end_on_exit=False):
+ complete_response = {"choices": [], "model": "", "id": ""}
+ async for item in response:
+ yield item
+ _accumulate_streaming_response(complete_response, item)
+ _set_response_attributes(span, complete_response)
+ _set_token_usage(span, request_kwargs, complete_response)
+ if should_emit_events():
+ _emit_streaming_response_events(complete_response)
+ else:
+ if should_send_prompts():
+ _set_completions(span, complete_response.get("choices"))
+ span.set_status(Status(StatusCode.OK))
+ span.end()Run this quick grep to spot any remaining emit_event sites in streaming code that run outside a use_span context:
Also applies to: 108-115, 183-201, 204-222
🏁 Script executed:
#!/bin/bash
rg -n "emit_event\(" -A 4 -B 6 packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.pyLength of output: 1359
I’d like to verify whether any trace.use_span wrappers already surround non-streaming paths or other emit sites. Let’s search for existing use_span usages in this file:
🏁 Script executed:
rg -n "use_span" -A 3 -B 3 packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.pyLength of output: 582
Wrap streaming generators in trace.use_span to preserve current span context
Returning the raw generator exits the outer use_span (though the span remains open), so any emit_event calls inside the streaming builders run without a current span. Enclose the bodies of both streaming helpers in a nested with trace.use_span(span, end_on_exit=False): block so that events, status updates, and span.end() execute under the correct context.
Affected locations in
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py:
_build_from_streaming_response_abuild_from_streaming_response
Suggested patch:
def _build_from_streaming_response(span, request_kwargs, response):
- complete_response = {"choices": [], "model": "", "id": ""}
- for item in response:
- yield item
- _accumulate_streaming_response(complete_response, item)
- _set_response_attributes(span, complete_response)
- _set_token_usage(span, request_kwargs, complete_response)
- if should_emit_events():
- _emit_streaming_response_events(complete_response)
- else:
- if should_send_prompts():
- _set_completions(span, complete_response.get("choices"))
- span.set_status(Status(StatusCode.OK))
- span.end()
+ # Keep span current for events and finalization
+ with trace.use_span(span, end_on_exit=False):
+ complete_response = {"choices": [], "model": "", "id": ""}
+ for item in response:
+ yield item
+ _accumulate_streaming_response(complete_response, item)
+ _set_response_attributes(span, complete_response)
+ _set_token_usage(span, request_kwargs, complete_response)
+ if should_emit_events():
+ _emit_streaming_response_events(complete_response)
+ elif should_send_prompts():
+ _set_completions(span, complete_response.get("choices"))
+ span.set_status(Status(StatusCode.OK))
+ span.end()
async def _abuild_from_streaming_response(span, request_kwargs, response):
- complete_response = {"choices": [], "model": "", "id": ""}
- async for item in response:
- yield item
- _accumulate_streaming_response(complete_response, item)
- _set_response_attributes(span, complete_response)
- _set_token_usage(span, request_kwargs, complete_response)
- if should_emit_events():
- _emit_streaming_response_events(complete_response)
- else:
- if should_send_prompts():
- _set_completions(span, complete_response.get("choices"))
- span.set_status(Status(StatusCode.OK))
- span.end()
+ # Keep span current for events and finalization
+ with trace.use_span(span, end_on_exit=False):
+ complete_response = {"choices": [], "model": "", "id": ""}
+ async for item in response:
+ yield item
+ _accumulate_streaming_response(complete_response, item)
+ _set_response_attributes(span, complete_response)
+ _set_token_usage(span, request_kwargs, complete_response)
+ if should_emit_events():
+ _emit_streaming_response_events(complete_response)
+ elif should_send_prompts():
+ _set_completions(span, complete_response.get("choices"))
+ span.set_status(Status(StatusCode.OK))
+ span.end()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if is_streaming_response(response): | |
| # span will be closed after the generator is done | |
| return _build_from_streaming_response(span, kwargs, response) | |
| else: | |
| _handle_response(response, span, instance) | |
| def _build_from_streaming_response(span, request_kwargs, response): | |
| # Keep span current for events and finalization | |
| with trace.use_span(span, end_on_exit=False): | |
| complete_response = {"choices": [], "model": "", "id": ""} | |
| for item in response: | |
| yield item | |
| _accumulate_streaming_response(complete_response, item) | |
| _set_response_attributes(span, complete_response) | |
| _set_token_usage(span, request_kwargs, complete_response) | |
| if should_emit_events(): | |
| _emit_streaming_response_events(complete_response) | |
| elif should_send_prompts(): | |
| _set_completions(span, complete_response.get("choices")) | |
| span.set_status(Status(StatusCode.OK)) | |
| span.end() | |
| async def _abuild_from_streaming_response(span, request_kwargs, response): | |
| # Keep span current for events and finalization | |
| with trace.use_span(span, end_on_exit=False): | |
| complete_response = {"choices": [], "model": "", "id": ""} | |
| async for item in response: | |
| yield item | |
| _accumulate_streaming_response(complete_response, item) | |
| _set_response_attributes(span, complete_response) | |
| _set_token_usage(span, request_kwargs, complete_response) | |
| if should_emit_events(): | |
| _emit_streaming_response_events(complete_response) | |
| elif should_send_prompts(): | |
| _set_completions(span, complete_response.get("choices")) | |
| span.set_status(Status(StatusCode.OK)) | |
| span.end() |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py
around lines 72 to 77, the streaming generator helpers
(_build_from_streaming_response and _abuild_from_streaming_response) return raw
generators which causes code executed inside them (emit_event, set_status,
span.end()) to run without the current span context; wrap the entire body of
each streaming helper in a nested with trace.use_span(span, end_on_exit=False):
block so the yielded generator executes with the span active, ensuring
emit_event/status/span.end() run under the correct context (apply the same
change to both sync and async variants).
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py
Show resolved
Hide resolved
| message_content = content[0].get("text").get("value") | ||
| message_role = msg.get("role") | ||
| if message_role in ["user", "system"]: | ||
| if should_emit_events(): | ||
| emit_event(MessageEvent(content=message_content, role=message_role)) | ||
| else: | ||
| _set_span_attribute( | ||
| span, | ||
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", | ||
| message_role, | ||
| ) | ||
| _set_span_attribute( | ||
| span, | ||
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", | ||
| message_content, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Brittle access to message content; handle non-text and empty content safely
content is assumed to be a list where content[0]["text"]["value"] exists. This can KeyError when first item is not "text" or content is empty.
- message_content = content[0].get("text").get("value")
+ message_content = None
+ if isinstance(content, list) and content:
+ first = content[0]
+ if isinstance(first, dict) and isinstance(first.get("text"), dict):
+ message_content = first["text"].get("value")
+ if message_content is None:
+ # Fallback: best-effort stringification
+ message_content = str(content)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| message_content = content[0].get("text").get("value") | |
| message_role = msg.get("role") | |
| if message_role in ["user", "system"]: | |
| if should_emit_events(): | |
| emit_event(MessageEvent(content=message_content, role=message_role)) | |
| else: | |
| _set_span_attribute( | |
| span, | |
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", | |
| message_role, | |
| ) | |
| _set_span_attribute( | |
| span, | |
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", | |
| message_content, | |
| ) | |
| message_content = None | |
| if isinstance(content, list) and content: | |
| first = content[0] | |
| if isinstance(first, dict) and isinstance(first.get("text"), dict): | |
| message_content = first["text"].get("value") | |
| if message_content is None: | |
| # Fallback: best-effort stringification | |
| message_content = str(content) | |
| message_role = msg.get("role") | |
| if message_role in ["user", "system"]: | |
| if should_emit_events(): | |
| emit_event(MessageEvent(content=message_content, role=message_role)) | |
| else: | |
| _set_span_attribute( | |
| span, | |
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", | |
| message_role, | |
| ) | |
| _set_span_attribute( | |
| span, | |
| f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", | |
| message_content, | |
| ) |
| kwargs["event_handler"] = EventHandleWrapper( | ||
| original_handler=kwargs["event_handler"], | ||
| span=span, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard event_handler wrapping to avoid KeyError
kwargs["event_handler"] may be absent. Wrap only if provided.
- kwargs["event_handler"] = EventHandleWrapper(
- original_handler=kwargs["event_handler"],
- span=span,
- )
+ if kwargs.get("event_handler"):
+ kwargs["event_handler"] = EventHandleWrapper(
+ original_handler=kwargs["event_handler"],
+ span=span,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| kwargs["event_handler"] = EventHandleWrapper( | |
| original_handler=kwargs["event_handler"], | |
| span=span, | |
| ) | |
| if kwargs.get("event_handler"): | |
| kwargs["event_handler"] = EventHandleWrapper( | |
| original_handler=kwargs["event_handler"], | |
| span=span, | |
| ) |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py
around lines 312 to 315, the code unconditionally assigns
kwargs["event_handler"] = EventHandleWrapper(...), which raises a KeyError when
"event_handler" is not present; change this to only wrap when the key exists
(e.g., if "event_handler" in kwargs and kwargs["event_handler"] is not None) or
use kwargs.get and conditionally set the wrapped handler, leaving kwargs
unchanged when absent.
nirga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall! 2 requests:
- fix the one comment from coderabbit
- add a test :)
...lemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (2)
195-196: Brittle content access needs error handlingThe code assumes
content[0].get("text").get("value")structure exists, which can cause KeyError if the first item isn't "text" type or if content is empty.The previous review comment about brittle access to message content still applies. Consider adding safe access pattern:
- message_content = content[0].get("text").get("value") + message_content = None + if isinstance(content, list) and content: + first = content[0] + if isinstance(first, dict) and isinstance(first.get("text"), dict): + message_content = first["text"].get("value") + if message_content is None: + message_content = str(content)
313-316: EventHandleWrapper assignment needs safety checkThe code unconditionally assigns
kwargs["event_handler"]which will fail if the key doesn't exist.The previous review comment about guarding the event_handler wrapping still applies:
- kwargs["event_handler"] = EventHandleWrapper( - original_handler=kwargs["event_handler"], - span=span, - ) + if kwargs.get("event_handler"): + kwargs["event_handler"] = EventHandleWrapper( + original_handler=kwargs["event_handler"], + span=span, + )
🧹 Nitpick comments (2)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (2)
4-4: Remove unused importThe
MagicMockimport is not used anywhere in the test file.-from unittest.mock import MagicMock
35-41: Use contextlib.suppress for cleaner exception handlingThe try-except-pass block can be simplified using
contextlib.suppressfor better readability.+import contextlib + - try: - mock_openai_client.chat.completions.create( - model="gpt-3.5-turbo", - messages=[{"role": "user", "content": "Test span context"}], - ) - except Exception: - pass + with contextlib.suppress(Exception): + mock_openai_client.chat.completions.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Test span context"}], + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (3)
packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
EventAttributes(267-286)packages/opentelemetry-instrumentation-openai/tests/conftest.py (1)
mock_openai_client(47-51)packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
get_finished_spans(40-43)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (5)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py (1)
Config(6-15)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (2)
model_as_dict(342-352)_set_span_attribute(30-37)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes(64-257)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (1)
should_emit_events(174-181)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/event_handler_wrapper.py (1)
EventHandleWrapper(13-132)
🪛 Ruff (0.12.2)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py
4-4: unittest.mock.MagicMock imported but unused
Remove unused import: unittest.mock.MagicMock
(F401)
35-41: Use contextlib.suppress(Exception) instead of try-except-pass
Replace with contextlib.suppress(Exception)
(SIM105)
🪛 Flake8 (7.2.0)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py
[error] 4-4: 'unittest.mock.MagicMock' imported but unused
(F401)
🔇 Additional comments (7)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (2)
14-28: Well-structured helper function for event validationThe helper function correctly validates both the trace/span ID propagation and ensures the event belongs to the OpenAI system. The assertions provide clear error messages for debugging.
30-62: Test effectively validates span context propagationThe test correctly verifies the core PR objective: ensuring events contain proper trace and span IDs that match their parent spans. The conditional checks handle test environment variations gracefully with clear skip messages.
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (5)
5-5: Good addition of trace import for context managementThis import enables the use of
trace.use_span()to set the current context, which is essential for the span ID propagation fix.
132-139: Proper trace context management for event propagationThe use of
trace.use_span(span, end_on_exit=False)correctly sets the current context so that events emitted within this scope will have the proper trace and span IDs. The exception handling within the context ensures spans are properly ended even on errors.
186-188: Conditional event emission properly implementedThe code correctly gates event emission based on
should_emit_events(). This addresses the previous review feedback about unconditional event emission.
260-327: Comprehensive trace context implementation for streamingThe implementation properly wraps the entire streaming operation within
trace.use_span(), ensuring events from the EventHandleWrapper have correct trace context. The exception handling correctly ends the span and re-raises errors.
141-188: No span indexing inconsistencies detectedThe
prompt_indexin assistant_wrappers.py starts at 0, is incremented once for the assistant’s instructions and once for the run’s instructions, matching the pattern in responses_wrappers.py (which also initializes at 0 and increments per prompt). Tests in test_azure.py expect the first prompt’s index to be 0, aligning with this implementation—no changes required.
5d4a66e to
4738519
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (3)
195-210: Brittle content access pattern needs defensive handling.The code assumes
content[0].get("text").get("value")exists without validation. This can raiseKeyErrororAttributeErrorwhen the content structure differs from expectations.This issue was identified in previous reviews and needs to be addressed with defensive handling for non-text content or empty arrays.
313-316: Guard against missing event_handler to prevent KeyError.The code unconditionally wraps
kwargs["event_handler"]without checking if the key exists, which will raise a KeyError when no event handler is provided.This issue was identified in previous reviews and needs conditional wrapping only when the event_handler is present.
185-188: Ensure mutual exclusivity between event emission and span attributes for run instructionsThe current logic always calls
_set_span_attribute(...content, run["instructions"])and then conditionally emits an event. It should mirror the pattern used elsewhere—emit the event or set bothroleandcontentattributes, not both.Locations to update:
- File packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py
- Around lines 181–188
Suggested diff:
- _set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", - run["instructions"], - ) - if should_emit_events(): - emit_event(MessageEvent(content=run["instructions"], role="system")) - prompt_index += 1 + if should_emit_events(): + emit_event(MessageEvent(content=run["instructions"], role="system")) + else: + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", + "system" + ) + _set_span_attribute( + span, + f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", + run["instructions"], + ) + prompt_index += 1
🧹 Nitpick comments (1)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (1)
34-40: Consider using contextlib.suppress for cleaner exception handling.The static analysis tool correctly identifies that the try-except-pass pattern can be simplified using
contextlib.suppress(Exception)for better readability.Apply this diff to simplify the exception handling:
+from contextlib import suppress + # The mock_openai_client fixture should trigger instrumentation but not make real calls - try: + with suppress(Exception): mock_openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Test span context"}], ) - except Exception: - pass
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py(3 hunks)packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/completion_wrappers.py
- packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/chat_wrappers.py
🧰 Additional context used
🧬 Code Graph Analysis (2)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (5)
packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/config.py (1)
Config(6-15)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/shared/__init__.py (1)
model_as_dict(342-352)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes(64-257)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/utils.py (1)
should_emit_events(174-181)packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/event_handler_wrapper.py (1)
EventHandleWrapper(13-132)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (3)
packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
EventAttributes(267-286)packages/opentelemetry-instrumentation-openai/tests/conftest.py (1)
mock_openai_client(47-51)packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
get_finished_spans(40-43)
🪛 Ruff (0.12.2)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py
34-40: Use contextlib.suppress(Exception) instead of try-except-pass
Replace with contextlib.suppress(Exception)
(SIM105)
🔇 Additional comments (4)
packages/opentelemetry-instrumentation-openai/tests/traces/test_span_context_propagation.py (2)
1-62: LGTM! Well-structured test for span context propagation.The test implementation correctly validates that events contain proper trace and span IDs, which addresses the core issue described in the PR objectives. The use of skips for missing spans or logs provides graceful handling of test environment variations.
13-27: LGTM! Comprehensive event validation logic.The
assert_event_has_span_contextfunction properly validates both trace context propagation and OpenAI event categorization. The error messages provide clear diagnostics for debugging test failures.packages/opentelemetry-instrumentation-openai/opentelemetry/instrumentation/openai/v1/assistant_wrappers.py (2)
132-139: LGTM! Proper span context management with error handling.The implementation correctly uses
trace.use_span()to set the current context as required for event propagation, and properly handles exceptions by recording them and setting error status before ending the span.
260-327: LGTM! Consistent trace context management in streaming wrapper.The implementation properly establishes span context for event propagation and handles the EventHandleWrapper integration correctly. The exception handling follows the same pattern as the non-streaming wrapper.
feat(instrumentation): ...orfix(instrumentation): ....Fixes #3234
Testing:
Important
Fixes span ID propagation in OpenAI instrumentation wrappers by setting the current context with
trace.use_span()and ensuring proper span closure and error handling.trace.use_span()to set the current context inchat_wrapper()andachat_wrapper()inchat_wrappers.py,completion_wrapper()andacompletion_wrapper()incompletion_wrappers.py, andmessages_list_wrapper()andruns_create_and_stream_wrapper()inassistant_wrappers.py.chat_wrappers.py,completion_wrappers.py, andassistant_wrappers.py._build_from_streaming_response()and_abuild_from_streaming_response()incompletion_wrappers.pyandchat_wrappers.py.EventHandleWrapperfor event handling inruns_create_and_stream_wrapper()inassistant_wrappers.py.This description was created by
for 814e6f1. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit
New Features
Bug Fixes
Tests