-
Notifications
You must be signed in to change notification settings - Fork 868
fix(anthropic): preserve streaming helper methods in instrumentation #3377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Fixes issue where Anthropic streaming helper methods like get_final_message(), text_stream, and until_done() were broken by instrumentation. Replaced build_from_streaming_response functions with AnthropicStream/AnthropicAsyncStream wrapper classes that preserve all original API methods while maintaining proper OpenTelemetry instrumentation. Fixes #3371 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughSwitches Anthropic streaming instrumentation from function-based builders to class-based wrappers (AnthropicStream, AnthropicAsyncStream), passing an explicit start_time. Updates entrypoints to return these proxies. Adds tests and VCR cassettes validating legacy streaming helpers (get_final_message, text_stream, until_done) for both async and sync paths. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App
participant SDK as Anthropic SDK
participant W as AnthropicStream/AnthropicAsyncStream
participant OTEL as Instrumentation (Span/Metrics)
App->>SDK: messages.stream(..., stream=true)
SDK->>OTEL: create span, start_time
SDK->>W: return wrapper(proxy around stream, with start_time)
rect rgba(230,245,255,0.6)
note right of W: Streaming loop
App->>W: iterate/await next()
W->>W: _process_response_item(event)
W->>OTEL: update counters/histograms (tokens/choices)
alt error during streaming
W-->>OTEL: record exception, set span ERROR
W-->>App: raise
end
end
alt App calls get_final_message/text_stream/until_done
App->>W: helper method
W->>W: consume stream to completion
end
W->>OTEL: finalize usage + duration
OTEL-->>OTEL: end span (OK if no error)
W-->>App: final message/text or completed iteration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing touches
🧪 Generate unit tests
Comment Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed everything up to 83550fa in 1 minute and 53 seconds. Click for details.
- Reviewed
1063lines of code in6files - Skipped
0files when reviewing. - Skipped posting
8draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:180
- Draft comment:
The getattr override cleanly intercepts helper methods (e.g. get_final_message, text_stream). Consider adding a clarifying comment explaining that this override preserves original API methods for instrumentation purposes. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The code already has clear documentation about the purpose of getattr and the class's role in preserving helper methods while adding instrumentation. The existing docstrings are concise and clear. Adding more documentation would be redundant. Perhaps the existing docstring could be more explicit about which specific methods are being preserved and why instrumentation is needed. The current docstrings already strike a good balance - the class docstring explains the high-level purpose while the method docstring explains the specific mechanism. The preserved methods are clearly listed in the code itself. The comment should be deleted since the code already has appropriate documentation explaining the purpose of getattr and the class's role in preserving helper methods.
2. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:191
- Draft comment:
In _instrumented_get_final_message, the stream is fully consumed (using 'for _ in self: pass') before calling the original helper method. Please document this side-effect so that maintainers understand that get_final_message() will exhaust the iterator. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The side effect of exhausting the iterator is actually an intentional part of the instrumentation wrapper's design, not an accidental side effect. The class is explicitly designed to ensure all stream items go through the instrumentation proxy. The docstring already indicates this is an "instrumented version" and the class name and overall context make it clear this is a wrapper for instrumentation purposes. The comment raises a valid point about making behavior explicit. Even if intentional, documenting side effects is generally good practice. However, in this case, the iterator exhaustion is not really a "side effect" - it's the core purpose of this instrumentation wrapper. The class name, docstrings, and context already make this clear. The comment should be deleted. The iterator exhaustion is an intentional core behavior that's already clear from the context, not a side effect that needs additional documentation.
3. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:219
- Draft comment:
In the next method, error handling ends the span on exceptions. Verify that span.end() is called only once and that error states are handled idempotently. - Reason this comment was not posted:
Comment looked like it was already resolved.
4. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:85
- Draft comment:
Several tests use try/except blocks around calls with 'unknown_parameter'. Add a brief comment to clarify that these exceptions are expected as a part of testing invalid parameter handling. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
5. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:120
- Draft comment:
There is noticeable duplication in assertions and log/spans checks across tests. Consider refactoring common assertions into helper functions to improve maintainability. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
6. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2318
- Draft comment:
In test_with_asyncio_run_legacy, asyncio.run is used inside a test marked with @pytest.mark.asyncio. Consider using the async test pattern consistently (without asyncio.run) to avoid potential event loop conflicts. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
7. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:108
- Draft comment:
Several tests assert hard-coded numeric values for token usage (e.g., prompt tokens == 17). Ensure that these expected values remain valid as the underlying token counting logic evolves. - Reason this comment was not posted:
Comment was not on a location in the diff, so it can't be submitted as a review comment.
8. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:24
- Draft comment:
There is a stray parenthesis "")" at the beginning of the file (line 24) that doesn't appear to serve any purpose. It might be a typographical error and should be reviewed. - Reason this comment was not posted:
Comment was on unchanged code.
Workflow ID: wflow_IMyQNzzFU04WhHrO
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py
Outdated
Show resolved
Hide resolved
…ssages.py Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
- Remove trailing whitespace in test_messages.py - Clean up blank lines containing whitespace in streaming helper method tests 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed cf8cfdf in 46 seconds. Click for details.
- Reviewed
68lines of code in1files - Skipped
0files when reviewing. - Skipped posting
3draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2845
- Draft comment:
The async streaming helper method test properly verifies that get_final_message() returns a valid message and that the helper methods (text_stream, until_done) remain available after instrumentation. This effectively confirms the fix for issue #3371. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
2. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2898
- Draft comment:
Consider removing the debug print statement (printing the number of spans) from the text_stream helper test to keep test output clean. - Reason this comment was not posted:
Confidence changes required:33%<= threshold50%None
3. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2903
- Draft comment:
The sync streaming helper methods test correctly collects events from the stream and asserts non-empty results, ensuring that helper methods are intact post-instrumentation. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_ctqhHsiYzx59YGdm
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
♻️ Duplicate comments (1)
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (1)
2898-2898: Remove debug print from test output.Leftover print clutters CI logs.
- print(f"Number of spans created: {len(spans)}")
🧹 Nitpick comments (12)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (6)
44-48: Fix confusing empty-string literal.
""""""works via literal concatenation but is non-idiomatic. Use"".- complete_response["events"][index]["input"] = """""" + complete_response["events"][index]["input"] = ""
109-116: Use isinstance() for numeric checks.
type(x) is intexcludes subclasses and is brittle.- if token_histogram and type(input_tokens) is int and input_tokens >= 0: + if token_histogram and isinstance(input_tokens, int) and input_tokens >= 0: @@ - if token_histogram and type(completion_tokens) is int and completion_tokens >= 0: + if token_histogram and isinstance(completion_tokens, int) and completion_tokens >= 0:Also applies to: 118-125
180-190: Consider preserving additional helpers (get_final_text).You restored
get_final_message,text_stream,until_done. Addget_final_textto cover another commonly used helper.- elif name == 'until_done': + elif name == 'until_done': return self._instrumented_until_done + elif name == 'get_final_text': + return self._instrumented_get_final_textAnd implement:
+ def _instrumented_get_final_text(self): + for _ in self: + pass + return self.__wrapped__.get_final_text()
191-197: Avoid getattr with constant attribute.Direct attribute access is clearer and satisfies linters.
- original_get_final_message = getattr(self.__wrapped__, 'get_final_message') - return original_get_final_message() + return self.__wrapped__.get_final_message()
331-341: Also preserve get_final_text in async wrapper.Parity with sync path.
- elif name == 'until_done': + elif name == 'until_done': return self._instrumented_until_done + elif name == 'get_final_text': + return self._instrumented_get_final_textAnd implement:
+ async def _instrumented_get_final_text(self): + async for _ in self: + pass + return await self.__wrapped__.get_final_text()
342-351: Avoid getattr with constant attribute (async).- original_get_final_message = getattr(self.__wrapped__, 'get_final_message') - return await original_get_final_message() + return await self.__wrapped__.get_final_message()packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (4)
2845-2851: Silence Ruff ARG001 on unused fixtures.Fixtures are intentionally unused for side-effects; add
# noqa: ARG001to the function def line to keep linters happy.-async def test_anthropic_streaming_helper_methods_legacy( +async def test_anthropic_streaming_helper_methods_legacy( # noqa: ARG001 instrument_legacy, async_anthropic_client, span_exporter, log_exporter, reader ):
2876-2882: Silence Ruff ARG001 on unused fixtures.-async def test_anthropic_text_stream_helper_method_legacy( +async def test_anthropic_text_stream_helper_method_legacy( # noqa: ARG001 instrument_legacy, async_anthropic_client, span_exporter ):
2903-2909: Silence Ruff ARG001 on unused fixtures.-def test_anthropic_sync_streaming_helper_methods_legacy( +def test_anthropic_sync_streaming_helper_methods_legacy( # noqa: ARG001 instrument_legacy, anthropic_client, span_exporter ):
2909-2924: Actually exercise a sync helper to validate preservation.The test name implies helper coverage, but it only iterates events. Consider asserting
get_final_message()or usingtext_stream.- # Collect all events - events = [] - for event in stream: - events.append(event) - assert len(events) > 0 + # Validate helper works + text = "".join(stream.text_stream) + assert len(text) > 0packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (2)
560-572: Avoid potential mis-wrap: check for Stream explicitly in sync path.Guard against an unexpected AsyncStream being wrapped by the sync wrapper by narrowing the condition.
Apply:
- if is_streaming_response(response): + if isinstance(response, Stream): return AnthropicStream( span, response, instance._client, start_time, token_histogram, choice_counter, duration_histogram, exception_counter, event_logger, kwargs, )Notes:
- Please confirm all wrapped sync methods here can only return anthropic._streaming.Stream. If any can return AsyncStream, this change is required.
- Minor nit:
end_time = time.time()on Line 558 is now unused for streaming paths; consider removing to satisfy flake8 F841.
682-693: Mirror the precise type check in async path.Use AsyncStream to prevent accidental sync-wrapper usage.
Apply:
- if is_streaming_response(response): + if isinstance(response, AsyncStream): return AnthropicAsyncStream( span, response, instance._client, start_time, token_histogram, choice_counter, duration_histogram, exception_counter, event_logger, kwargs, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py(3 hunks)packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py(4 hunks)packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml(1 hunks)packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml(1 hunks)packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml(1 hunks)packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/cassettes/**/*.{yaml,yml,json}
📄 CodeRabbit inference engine (CLAUDE.md)
Never commit secrets or PII in VCR cassettes; scrub sensitive data
Files:
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yamlpackages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yamlpackages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.pypackages/opentelemetry-instrumentation-anthropic/tests/test_messages.pypackages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py
🧬 Code graph analysis (3)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py (4)
error_metrics_attributes(215-219)shared_metrics_attributes(178-211)set_span_attribute(20-24)count_prompt_tokens_from_request(223-241)packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
_set_token_usage(293-403)
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (3)
packages/opentelemetry-instrumentation-anthropic/tests/conftest.py (3)
instrument_legacy(83-98)async_anthropic_client(78-79)anthropic_client(73-74)packages/opentelemetry-instrumentation-milvus/tests/conftest.py (1)
reader(37-41)packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
get_finished_spans(40-43)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (2)
AnthropicAsyncStream(300-451)AnthropicStream(149-297)
🪛 Ruff (0.12.2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
163-163: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
195-195: Do not call getattr with a constant attribute value. It is not any safer than normal property access.
Replace getattr with attribute access
(B009)
288-288: Do not catch blind exception: Exception
(BLE001)
314-314: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
349-349: Do not call getattr with a constant attribute value. It is not any safer than normal property access.
Replace getattr with attribute access
(B009)
442-442: Do not catch blind exception: Exception
(BLE001)
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py
2848-2848: Unused function argument: instrument_legacy
(ARG001)
2848-2848: Unused function argument: log_exporter
(ARG001)
2848-2848: Unused function argument: reader
(ARG001)
2879-2879: Unused function argument: instrument_legacy
(ARG001)
2905-2905: Unused function argument: instrument_legacy
(ARG001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test Packages (3.11)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Build Packages (3.11)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Lint
🔇 Additional comments (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (1)
481-497: Overall approach LGTM.Switching to ObjectProxy-based wrappers to preserve helper methods while instrumenting is a solid fix for #3371. Tests cover the key helpers.
Also applies to: 529-545
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
20-21: Imports updated to class-based stream wrappers — aligns with goal to preserve helper methods.Using AnthropicStream/AnthropicAsyncStream from streaming.py matches the PR intent and restores get_final_message/text_stream/until_done without sacrificing instrumentation.
| def __init__( | ||
| self, | ||
| span, | ||
| response, | ||
| instance, | ||
| start_time, | ||
| token_histogram: Histogram = None, | ||
| choice_counter: Counter = None, | ||
| duration_histogram: Histogram = None, | ||
| exception_counter: Counter = None, | ||
| event_logger: Optional[EventLogger] = None, | ||
| kwargs: dict = {}, | ||
| ): | ||
| super().__init__(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid mutable default for kwargs and copy on assignment.
Defaulting kwargs to {} is unsafe; also store a defensive copy.
- def __init__(
+ def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
- kwargs: dict = {},
+ kwargs: Optional[dict] = None,
):
@@
- self._kwargs = kwargs
+ self._kwargs = dict(kwargs or {})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __init__( | |
| self, | |
| span, | |
| response, | |
| instance, | |
| start_time, | |
| token_histogram: Histogram = None, | |
| choice_counter: Counter = None, | |
| duration_histogram: Histogram = None, | |
| exception_counter: Counter = None, | |
| event_logger: Optional[EventLogger] = None, | |
| kwargs: dict = {}, | |
| ): | |
| super().__init__(response) | |
| def __init__( | |
| self, | |
| span, | |
| response, | |
| instance, | |
| start_time, | |
| token_histogram: Histogram = None, | |
| choice_counter: Counter = None, | |
| duration_histogram: Histogram = None, | |
| exception_counter: Counter = None, | |
| event_logger: Optional[EventLogger] = None, | |
| kwargs: Optional[dict] = None, | |
| ): | |
| super().__init__(response) | |
| self._kwargs = dict(kwargs or {}) |
🧰 Tools
🪛 Ruff (0.12.2)
163-163: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 152 to 165, the __init__ uses a mutable default kwargs = {} and
assigns it directly; change the signature to default kwargs to None and inside
the constructor set self.kwargs = dict(kwargs) if kwargs is not None else {}, so
you avoid shared mutable state and store a defensive copy of the passed mapping.
| # Handle errors during streaming | ||
| if not self._instrumentation_completed: | ||
| attributes = error_metrics_attributes(e) | ||
| if self._exception_counter: | ||
| self._exception_counter.add(1, attributes=attributes) | ||
| if self._span and self._span.is_recording(): | ||
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| self._span.end() | ||
| self._instrumentation_completed = True | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard span end on error.
If self._span is None, calling end() will raise.
- self._span.end()
+ if self._span:
+ self._span.end()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Handle errors during streaming | |
| if not self._instrumentation_completed: | |
| attributes = error_metrics_attributes(e) | |
| if self._exception_counter: | |
| self._exception_counter.add(1, attributes=attributes) | |
| if self._span and self._span.is_recording(): | |
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | |
| self._span.end() | |
| self._instrumentation_completed = True | |
| raise | |
| # Handle errors during streaming | |
| if not self._instrumentation_completed: | |
| attributes = error_metrics_attributes(e) | |
| if self._exception_counter: | |
| self._exception_counter.add(1, attributes=attributes) | |
| if self._span and self._span.is_recording(): | |
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | |
| if self._span: | |
| self._span.end() | |
| self._instrumentation_completed = True | |
| raise |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 227 to 236, the code calls self._span.end() without ensuring
self._span is not None which can raise; add a guard so you only call set_status
and end if self._span exists (and optionally is_recording for set_status) — e.g.
check if self._span before calling self._span.set_status(...) and
self._span.end() to avoid AttributeError, then proceed to set
self._instrumentation_completed = True and re-raise.
| if self._duration_histogram: | ||
| duration = time.time() - self._start_time | ||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes=metric_attributes, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle missing start_time when recording duration.
Avoid TypeError if start_time is None.
- if self._duration_histogram:
- duration = time.time() - self._start_time
+ if self._duration_histogram and self._start_time is not None:
+ duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self._duration_histogram: | |
| duration = time.time() - self._start_time | |
| self._duration_histogram.record( | |
| duration, | |
| attributes=metric_attributes, | |
| ) | |
| if self._duration_histogram and self._start_time is not None: | |
| duration = time.time() - self._start_time | |
| self._duration_histogram.record( | |
| duration, | |
| attributes=metric_attributes, | |
| ) |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 251 to 256, the code assumes self._start_time exists and will raise
a TypeError if it's None; guard the duration recording by checking that
self._start_time is not None before computing time.time() - self._start_time and
calling self._duration_histogram.record, and if start_time is missing simply
skip recording (or set a sensible default/start_time earlier) to avoid the
TypeError.
|
|
||
| metric_attributes = shared_metrics_attributes(complete_response) | ||
| class AnthropicAsyncStream(ObjectProxy): | ||
| """Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods""" | ||
|
|
||
| if duration_histogram: | ||
| duration = time.time() - start_time | ||
| duration_histogram.record( | ||
| duration, | ||
| attributes=metric_attributes, | ||
| ) | ||
| def __init__( | ||
| self, | ||
| span, | ||
| response, | ||
| instance, | ||
| start_time, | ||
| token_histogram: Histogram = None, | ||
| choice_counter: Counter = None, | ||
| duration_histogram: Histogram = None, | ||
| exception_counter: Counter = None, | ||
| event_logger: Optional[EventLogger] = None, | ||
| kwargs: dict = {}, | ||
| ): | ||
| super().__init__(response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Mirror kwargs default/copy fix in async wrapper.
Apply the same mutable-default and copy fix to AnthropicAsyncStream.
- kwargs: dict = {},
+ kwargs: Optional[dict] = None,
@@
- self._kwargs = kwargs
+ self._kwargs = dict(kwargs or {})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| metric_attributes = shared_metrics_attributes(complete_response) | |
| class AnthropicAsyncStream(ObjectProxy): | |
| """Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods""" | |
| if duration_histogram: | |
| duration = time.time() - start_time | |
| duration_histogram.record( | |
| duration, | |
| attributes=metric_attributes, | |
| ) | |
| def __init__( | |
| self, | |
| span, | |
| response, | |
| instance, | |
| start_time, | |
| token_histogram: Histogram = None, | |
| choice_counter: Counter = None, | |
| duration_histogram: Histogram = None, | |
| exception_counter: Counter = None, | |
| event_logger: Optional[EventLogger] = None, | |
| kwargs: dict = {}, | |
| ): | |
| super().__init__(response) | |
| class AnthropicAsyncStream(ObjectProxy): | |
| """Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods""" | |
| def __init__( | |
| self, | |
| span, | |
| response, | |
| instance, | |
| start_time, | |
| token_histogram: Histogram = None, | |
| choice_counter: Counter = None, | |
| duration_histogram: Histogram = None, | |
| exception_counter: Counter = None, | |
| event_logger: Optional[EventLogger] = None, | |
| kwargs: Optional[dict] = None, | |
| ): | |
| super().__init__(response) | |
| self._kwargs = dict(kwargs or {}) |
🧰 Tools
🪛 Ruff (0.12.2)
314-314: Do not use mutable data structures for argument defaults
Replace with None; initialize within function
(B006)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 299 to 316, the AnthropicAsyncStream constructor uses a mutable
default for kwargs (kwargs: dict = {}), which can lead to shared state across
instances; change the default to None (kwargs: dict | None = None) and inside
__init__ set self._kwargs = dict(kwargs) if kwargs is not None else {} (or
kwargs.copy()) to ensure each instance gets its own copy and avoid mutating a
shared dict.
| except Exception as e: | ||
| # Handle errors during streaming | ||
| if not self._instrumentation_completed: | ||
| attributes = error_metrics_attributes(e) | ||
| if self._exception_counter: | ||
| self._exception_counter.add(1, attributes=attributes) | ||
| if self._span and self._span.is_recording(): | ||
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | ||
| self._span.end() | ||
| self._instrumentation_completed = True | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Guard span end on error (async).
- self._span.end()
+ if self._span:
+ self._span.end()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except Exception as e: | |
| # Handle errors during streaming | |
| if not self._instrumentation_completed: | |
| attributes = error_metrics_attributes(e) | |
| if self._exception_counter: | |
| self._exception_counter.add(1, attributes=attributes) | |
| if self._span and self._span.is_recording(): | |
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | |
| self._span.end() | |
| self._instrumentation_completed = True | |
| raise | |
| except Exception as e: | |
| # Handle errors during streaming | |
| if not self._instrumentation_completed: | |
| attributes = error_metrics_attributes(e) | |
| if self._exception_counter: | |
| self._exception_counter.add(1, attributes=attributes) | |
| if self._span and self._span.is_recording(): | |
| self._span.set_status(Status(StatusCode.ERROR, str(e))) | |
| if self._span: | |
| self._span.end() | |
| self._instrumentation_completed = True | |
| raise |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 380-390, the exception handler calls self._span.end() without
guarding that self._span exists or whether it is already ended (race in async
code). Change the logic to first check self._span is not None, then if
self._span.is_recording() set the error status, and only then call
self._span.end(); also ensure you respect self._instrumentation_completed to
avoid double-ending (set it before or immediately after ending as appropriate).
| if self._duration_histogram: | ||
| duration = time.time() - self._start_time | ||
| self._duration_histogram.record( | ||
| duration, | ||
| attributes=metric_attributes, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle missing start_time when recording duration (async).
- if self._duration_histogram:
- duration = time.time() - self._start_time
+ if self._duration_histogram and self._start_time is not None:
+ duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self._duration_histogram: | |
| duration = time.time() - self._start_time | |
| self._duration_histogram.record( | |
| duration, | |
| attributes=metric_attributes, | |
| ) | |
| if self._duration_histogram and self._start_time is not None: | |
| duration = time.time() - self._start_time | |
| self._duration_histogram.record( | |
| duration, | |
| attributes=metric_attributes, | |
| ) |
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 405 to 410, the code records a duration using self._start_time but
does not guard against self._start_time being missing or None (possible in async
flows); update the block to check that hasattr(self, "_start_time") and
self._start_time is not None before computing duration and calling
self._duration_histogram.record, and if start_time is missing simply skip
recording (or record a sentinel if desired) to avoid raising an exception;
optionally add a short debug log when skipping so the behavior is visible in
traces.
| - 617d109c-a187-4902-889d-689223d134aa | ||
| anthropic-ratelimit-input-tokens-limit: | ||
| - '400000' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Redact organization and request IDs in cassette.
Same concern as other cassettes; please mask anthropic-organization-id and request-id.
- anthropic-organization-id:
- - 617d109c-a187-4902-889d-689223d134aa
+ anthropic-organization-id:
+ - "<REDACTED>"
@@
- request-id:
- - req_011CT6JRYAtJjioFin4pYtvp
+ request-id:
+ - "<REDACTED>"Also applies to: 142-143
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml
around lines 114-116 (and also apply same change at 142-143), redact the real
anthropic-organization-id and request-id values by replacing them with a
placeholder (e.g. "REDACTED_ANON_ORG_ID" and "REDACTED_REQUEST_ID") so sensitive
IDs are not stored in the cassette; update both occurrences consistently and
ensure formatting stays valid YAML.
| anthropic-organization-id: | ||
| - 617d109c-a187-4902-889d-689223d134aa | ||
| anthropic-ratelimit-input-tokens-limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Scrub org/request identifiers from cassette headers.
Redact anthropic-organization-id and request-id to avoid leaking potentially sensitive metadata in VCR fixtures.
Apply:
- anthropic-organization-id:
- - 617d109c-a187-4902-889d-689223d134aa
+ anthropic-organization-id:
+ - "<REDACTED>"
@@
- request-id:
- - req_011CT6JRcKB2ZPLaPAXKZ4Wa
+ request-id:
+ - "<REDACTED>"Optionally enforce via VCR config (outside this file):
# tests/conftest.py
import vcr
vcr = vcr.VCR(
filter_headers=["authorization", "x-api-key", "anthropic-organization-id", "request-id"],
)Also applies to: 130-132
🤖 Prompt for AI Agents
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml
lines ~102-104 and ~130-132: scrub the cassette headers by replacing the actual
anthropic-organization-id and request-id values with a redaction placeholder
(e.g. "REDACTED" or "<REDACTED>") so sensitive identifiers are not stored in the
fixture; update both header occurrences (lines indicated) accordingly and ensure
the format matches existing cassette header structure; optionally add guidance
to tests/conftest.py to filter these headers via VCR config as shown in the
review comment.
| - 617d109c-a187-4902-889d-689223d134aa | ||
| anthropic-ratelimit-input-tokens-limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Redact organization and request IDs.
Please mask anthropic-organization-id and request-id.
- anthropic-organization-id:
- - 617d109c-a187-4902-889d-689223d134aa
+ anthropic-organization-id:
+ - "<REDACTED>"
@@
- request-id:
- - req_011CT6JsAYbhj2squ2Q8fn85
+ request-id:
+ - "<REDACTED>"Also applies to: 130-131
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml
around lines 103-104 (and also apply same change at lines 130-131), the cassette
currently contains plaintext sensitive IDs (anthropic-organization-id and
request-id); redact them by replacing the actual ID values with masked
placeholders (e.g., "<REDACTED_ORG_ID>" and "<REDACTED_REQUEST_ID>") or a fixed
token so tests remain deterministic, ensuring formatting and YAML structure are
preserved.
Fixes #3371
Summary
get_final_message(),text_stream, anduntil_done()were broken by instrumentationbuild_from_streaming_response()andabuild_from_streaming_response()functions withAnthropicStreamandAnthropicAsyncStreamwrapper classesTest plan
test_anthropic_streaming_helper_methods_legacy,test_anthropic_text_stream_helper_method_legacy,test_anthropic_sync_streaming_helper_methods_legacy)🤖 Generated with Claude Code
Important
Fixes Anthropic streaming helper methods by introducing
AnthropicStreamandAnthropicAsyncStreamto preserve functionality with OpenTelemetry instrumentation.build_from_streaming_response()andabuild_from_streaming_response()withAnthropicStreamandAnthropicAsyncStreaminstreaming.pyto preserve helper methods likeget_final_message(),text_stream, anduntil_done().test_messages.pyto verify functionality with instrumentation.test_anthropic_streaming_helper_methods_legacy,test_anthropic_text_stream_helper_method_legacy, andtest_anthropic_sync_streaming_helper_methods_legacy.__init__.pyto use new wrapper classes for streaming responses.This description was created by
for cf8cfdf. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit
New Features
Improvements
Tests