Skip to content

Conversation

@nirga
Copy link
Member

@nirga nirga commented Sep 13, 2025

Fixes #3371

Summary

  • Fixed issue where Anthropic streaming helper methods like get_final_message(), text_stream, and until_done() were broken by instrumentation
  • Replaced build_from_streaming_response() and abuild_from_streaming_response() functions with AnthropicStream and AnthropicAsyncStream wrapper classes
  • New wrapper classes preserve all original API methods while maintaining proper OpenTelemetry instrumentation

Test plan

  • Added comprehensive tests for all helper methods (test_anthropic_streaming_helper_methods_legacy, test_anthropic_text_stream_helper_method_legacy, test_anthropic_sync_streaming_helper_methods_legacy)
  • All existing streaming tests pass (11/11 streaming tests)
  • All existing message tests pass (40/40 message tests)
  • Flake8 linting passes with no violations
  • No regressions in instrumentation functionality

🤖 Generated with Claude Code


Important

Fixes Anthropic streaming helper methods by introducing AnthropicStream and AnthropicAsyncStream to preserve functionality with OpenTelemetry instrumentation.

  • Behavior:
    • Replaces build_from_streaming_response() and abuild_from_streaming_response() with AnthropicStream and AnthropicAsyncStream in streaming.py to preserve helper methods like get_final_message(), text_stream, and until_done().
    • Ensures proper OpenTelemetry instrumentation for streaming responses.
  • Tests:
    • Adds tests for helper methods in test_messages.py to verify functionality with instrumentation.
    • Includes test_anthropic_streaming_helper_methods_legacy, test_anthropic_text_stream_helper_method_legacy, and test_anthropic_sync_streaming_helper_methods_legacy.
  • Misc:
    • Updates __init__.py to use new wrapper classes for streaming responses.

This description was created by Ellipsis for cf8cfdf. You can customize this summary. It will automatically update as commits are pushed.

Summary by CodeRabbit

  • New Features

    • Stream objects now provide helper methods: get_final_message, text_stream, and until_done.
    • Consistent behavior across synchronous and asynchronous streaming.
  • Improvements

    • More accurate timing and token usage metrics for streaming interactions.
    • More robust error handling and finalization of streaming sessions.
    • Non‑streaming behavior remains unchanged.
  • Tests

    • Added comprehensive async and sync streaming test coverage, including text streaming and final message helpers, backed by new cassettes.

Fixes issue where Anthropic streaming helper methods like get_final_message(),
text_stream, and until_done() were broken by instrumentation. Replaced
build_from_streaming_response functions with AnthropicStream/AnthropicAsyncStream
wrapper classes that preserve all original API methods while maintaining
proper OpenTelemetry instrumentation.

Fixes #3371

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Sep 13, 2025

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Switches Anthropic streaming instrumentation from function-based builders to class-based wrappers (AnthropicStream, AnthropicAsyncStream), passing an explicit start_time. Updates entrypoints to return these proxies. Adds tests and VCR cassettes validating legacy streaming helpers (get_final_message, text_stream, until_done) for both async and sync paths.

Changes

Cohort / File(s) Summary
Anthropic instrumentation entrypoint
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py
Replace streaming builder functions with class wrappers AnthropicStream/AnthropicAsyncStream; pass start_time; return wrappers in sync/async streaming paths.
Streaming wrappers implementation
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
Add ObjectProxy-based classes AnthropicStream and AnthropicAsyncStream; remove build_from_streaming_response/abuild_from_streaming_response; intercept iteration, finalize metrics/span on completion/errors; expose helper methods (get_final_message, text_stream, until_done); update stream manager enter methods to return wrappers.
VCR cassettes for legacy streaming helpers
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml, .../test_anthropic_sync_streaming_helper_methods_legacy.yaml, .../test_anthropic_text_stream_helper_method_legacy.yaml
Add three SSE fixtures covering legacy helper flows for async, sync, and text_stream streaming.
Tests for legacy streaming helpers
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py
Add async/sync tests validating get_final_message, text_stream, and stream consumption; assert one finished span named anthropic.chat.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor App
  participant SDK as Anthropic SDK
  participant W as AnthropicStream/AnthropicAsyncStream
  participant OTEL as Instrumentation (Span/Metrics)

  App->>SDK: messages.stream(..., stream=true)
  SDK->>OTEL: create span, start_time
  SDK->>W: return wrapper(proxy around stream, with start_time)

  rect rgba(230,245,255,0.6)
    note right of W: Streaming loop
    App->>W: iterate/await next()
    W->>W: _process_response_item(event)
    W->>OTEL: update counters/histograms (tokens/choices)
    alt error during streaming
      W-->>OTEL: record exception, set span ERROR
      W-->>App: raise
    end
  end

  alt App calls get_final_message/text_stream/until_done
    App->>W: helper method
    W->>W: consume stream to completion
  end

  W->>OTEL: finalize usage + duration
  OTEL-->>OTEL: end span (OK if no error)
  W-->>App: final message/text or completed iteration
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • doronkopit5

Poem

A bunny taps the tracing drum, hop-hop—
Streams now wear classy coats that never stop.
Text trickles, tokens tally, spans say “OK!”
Helpers return, whiskers turn—hip-hip hooray!
From start_time’s tick to final drop,
We nibble events till the carrots pop. 🥕

✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/anthropic-streaming-helper-methods

Comment @coderabbitai help to get the list of available commands and usage tips.

Pre-merge checks

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 55.56% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Title Check ✅ Passed The title "fix(anthropic): preserve streaming helper methods in instrumentation" is concise and accurately summarizes the primary change: restoring streaming helper methods in the Anthropic instrumentation. It names the affected component and the intended fix and aligns with the PR description and changed files.
Linked Issues Check ✅ Passed The PR implements AnthropicStream/AnthropicAsyncStream wrapper classes, updates init.py to return those wrappers, and adds tests that exercise get_final_message, text_stream, and until_done, which directly addresses the primary coding complaint in linked issue [#3371] about missing get_final_message under instrumentation. The provided summaries show the wrappers preserve helper methods and the tests cover both async and sync streaming flows, so the coding objectives from the issue are met.
Out of Scope Changes Check ✅ Passed All changed files in the summary are focused on Anthropic streaming instrumentation and accompanying test cassettes; the replacement of builder functions with wrapper classes and the added tests are directly related to restoring helper methods, and I see no unrelated or out-of-scope modifications in the provided diffs. Public API changes are intentional for the fix rather than extraneous edits.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Changes requested ❌

Reviewed everything up to 83550fa in 1 minute and 53 seconds. Click for details.
  • Reviewed 1063 lines of code in 6 files
  • Skipped 0 files when reviewing.
  • Skipped posting 8 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:180
  • Draft comment:
    The getattr override cleanly intercepts helper methods (e.g. get_final_message, text_stream). Consider adding a clarifying comment explaining that this override preserves original API methods for instrumentation purposes.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The code already has clear documentation about the purpose of getattr and the class's role in preserving helper methods while adding instrumentation. The existing docstrings are concise and clear. Adding more documentation would be redundant. Perhaps the existing docstring could be more explicit about which specific methods are being preserved and why instrumentation is needed. The current docstrings already strike a good balance - the class docstring explains the high-level purpose while the method docstring explains the specific mechanism. The preserved methods are clearly listed in the code itself. The comment should be deleted since the code already has appropriate documentation explaining the purpose of getattr and the class's role in preserving helper methods.
2. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:191
  • Draft comment:
    In _instrumented_get_final_message, the stream is fully consumed (using 'for _ in self: pass') before calling the original helper method. Please document this side-effect so that maintainers understand that get_final_message() will exhaust the iterator.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The side effect of exhausting the iterator is actually an intentional part of the instrumentation wrapper's design, not an accidental side effect. The class is explicitly designed to ensure all stream items go through the instrumentation proxy. The docstring already indicates this is an "instrumented version" and the class name and overall context make it clear this is a wrapper for instrumentation purposes. The comment raises a valid point about making behavior explicit. Even if intentional, documenting side effects is generally good practice. However, in this case, the iterator exhaustion is not really a "side effect" - it's the core purpose of this instrumentation wrapper. The class name, docstrings, and context already make this clear. The comment should be deleted. The iterator exhaustion is an intentional core behavior that's already clear from the context, not a side effect that needs additional documentation.
3. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:219
  • Draft comment:
    In the next method, error handling ends the span on exceptions. Verify that span.end() is called only once and that error states are handled idempotently.
  • Reason this comment was not posted:
    Comment looked like it was already resolved.
4. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:85
  • Draft comment:
    Several tests use try/except blocks around calls with 'unknown_parameter'. Add a brief comment to clarify that these exceptions are expected as a part of testing invalid parameter handling.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
5. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:120
  • Draft comment:
    There is noticeable duplication in assertions and log/spans checks across tests. Consider refactoring common assertions into helper functions to improve maintainability.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
6. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2318
  • Draft comment:
    In test_with_asyncio_run_legacy, asyncio.run is used inside a test marked with @pytest.mark.asyncio. Consider using the async test pattern consistently (without asyncio.run) to avoid potential event loop conflicts.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
7. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:108
  • Draft comment:
    Several tests assert hard-coded numeric values for token usage (e.g., prompt tokens == 17). Ensure that these expected values remain valid as the underlying token counting logic evolves.
  • Reason this comment was not posted:
    Comment was not on a location in the diff, so it can't be submitted as a review comment.
8. packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py:24
  • Draft comment:
    There is a stray parenthesis "")" at the beginning of the file (line 24) that doesn't appear to serve any purpose. It might be a typographical error and should be reviewed.
  • Reason this comment was not posted:
    Comment was on unchanged code.

Workflow ID: wflow_IMyQNzzFU04WhHrO

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

nirga and others added 2 commits September 13, 2025 14:46
…ssages.py

Co-authored-by: ellipsis-dev[bot] <65095814+ellipsis-dev[bot]@users.noreply.github.com>
- Remove trailing whitespace in test_messages.py
- Clean up blank lines containing whitespace in streaming helper method tests

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important

Looks good to me! 👍

Reviewed cf8cfdf in 46 seconds. Click for details.
  • Reviewed 68 lines of code in 1 files
  • Skipped 0 files when reviewing.
  • Skipped posting 3 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2845
  • Draft comment:
    The async streaming helper method test properly verifies that get_final_message() returns a valid message and that the helper methods (text_stream, until_done) remain available after instrumentation. This effectively confirms the fix for issue #3371.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50% None
2. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2898
  • Draft comment:
    Consider removing the debug print statement (printing the number of spans) from the text_stream helper test to keep test output clean.
  • Reason this comment was not posted:
    Confidence changes required: 33% <= threshold 50% None
3. packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py:2903
  • Draft comment:
    The sync streaming helper methods test correctly collects events from the stream and asserts non-empty results, ensuring that helper methods are intact post-instrumentation.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50% None

Workflow ID: wflow_ctqhHsiYzx59YGdm

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

♻️ Duplicate comments (1)
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (1)

2898-2898: Remove debug print from test output.

Leftover print clutters CI logs.

-    print(f"Number of spans created: {len(spans)}")
🧹 Nitpick comments (12)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (6)

44-48: Fix confusing empty-string literal.

"""""" works via literal concatenation but is non-idiomatic. Use "".

-                complete_response["events"][index]["input"] = """"""
+                complete_response["events"][index]["input"] = ""

109-116: Use isinstance() for numeric checks.

type(x) is int excludes subclasses and is brittle.

-    if token_histogram and type(input_tokens) is int and input_tokens >= 0:
+    if token_histogram and isinstance(input_tokens, int) and input_tokens >= 0:
@@
-    if token_histogram and type(completion_tokens) is int and completion_tokens >= 0:
+    if token_histogram and isinstance(completion_tokens, int) and completion_tokens >= 0:

Also applies to: 118-125


180-190: Consider preserving additional helpers (get_final_text).

You restored get_final_message, text_stream, until_done. Add get_final_text to cover another commonly used helper.

-        elif name == 'until_done':
+        elif name == 'until_done':
             return self._instrumented_until_done
+        elif name == 'get_final_text':
+            return self._instrumented_get_final_text

And implement:

+    def _instrumented_get_final_text(self):
+        for _ in self:
+            pass
+        return self.__wrapped__.get_final_text()

191-197: Avoid getattr with constant attribute.

Direct attribute access is clearer and satisfies linters.

-        original_get_final_message = getattr(self.__wrapped__, 'get_final_message')
-        return original_get_final_message()
+        return self.__wrapped__.get_final_message()

331-341: Also preserve get_final_text in async wrapper.

Parity with sync path.

-        elif name == 'until_done':
+        elif name == 'until_done':
             return self._instrumented_until_done
+        elif name == 'get_final_text':
+            return self._instrumented_get_final_text

And implement:

+    async def _instrumented_get_final_text(self):
+        async for _ in self:
+            pass
+        return await self.__wrapped__.get_final_text()

342-351: Avoid getattr with constant attribute (async).

-        original_get_final_message = getattr(self.__wrapped__, 'get_final_message')
-        return await original_get_final_message()
+        return await self.__wrapped__.get_final_message()
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (4)

2845-2851: Silence Ruff ARG001 on unused fixtures.

Fixtures are intentionally unused for side-effects; add # noqa: ARG001 to the function def line to keep linters happy.

-async def test_anthropic_streaming_helper_methods_legacy(
+async def test_anthropic_streaming_helper_methods_legacy(  # noqa: ARG001
     instrument_legacy, async_anthropic_client, span_exporter, log_exporter, reader
 ):

2876-2882: Silence Ruff ARG001 on unused fixtures.

-async def test_anthropic_text_stream_helper_method_legacy(
+async def test_anthropic_text_stream_helper_method_legacy(  # noqa: ARG001
     instrument_legacy, async_anthropic_client, span_exporter
 ):

2903-2909: Silence Ruff ARG001 on unused fixtures.

-def test_anthropic_sync_streaming_helper_methods_legacy(
+def test_anthropic_sync_streaming_helper_methods_legacy(  # noqa: ARG001
     instrument_legacy, anthropic_client, span_exporter
 ):

2909-2924: Actually exercise a sync helper to validate preservation.

The test name implies helper coverage, but it only iterates events. Consider asserting get_final_message() or using text_stream.

-        # Collect all events
-        events = []
-        for event in stream:
-            events.append(event)
-        assert len(events) > 0
+        # Validate helper works
+        text = "".join(stream.text_stream)
+        assert len(text) > 0
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (2)

560-572: Avoid potential mis-wrap: check for Stream explicitly in sync path.

Guard against an unexpected AsyncStream being wrapped by the sync wrapper by narrowing the condition.

Apply:

-    if is_streaming_response(response):
+    if isinstance(response, Stream):
         return AnthropicStream(
             span,
             response,
             instance._client,
             start_time,
             token_histogram,
             choice_counter,
             duration_histogram,
             exception_counter,
             event_logger,
             kwargs,
         )

Notes:

  • Please confirm all wrapped sync methods here can only return anthropic._streaming.Stream. If any can return AsyncStream, this change is required.
  • Minor nit: end_time = time.time() on Line 558 is now unused for streaming paths; consider removing to satisfy flake8 F841.

682-693: Mirror the precise type check in async path.

Use AsyncStream to prevent accidental sync-wrapper usage.

Apply:

-    if is_streaming_response(response):
+    if isinstance(response, AsyncStream):
         return AnthropicAsyncStream(
             span,
             response,
             instance._client,
             start_time,
             token_histogram,
             choice_counter,
             duration_histogram,
             exception_counter,
             event_logger,
             kwargs,
         )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a189d2d and cf8cfdf.

📒 Files selected for processing (6)
  • packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (3 hunks)
  • packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (4 hunks)
  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/cassettes/**/*.{yaml,yml,json}

📄 CodeRabbit inference engine (CLAUDE.md)

Never commit secrets or PII in VCR cassettes; scrub sensitive data

Files:

  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml
  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml
  • packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules

Files:

  • packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
  • packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py
  • packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py
🧬 Code graph analysis (3)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/utils.py (4)
  • error_metrics_attributes (215-219)
  • shared_metrics_attributes (178-211)
  • set_span_attribute (20-24)
  • count_prompt_tokens_from_request (223-241)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
  • _set_token_usage (293-403)
packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py (3)
packages/opentelemetry-instrumentation-anthropic/tests/conftest.py (3)
  • instrument_legacy (83-98)
  • async_anthropic_client (78-79)
  • anthropic_client (73-74)
packages/opentelemetry-instrumentation-milvus/tests/conftest.py (1)
  • reader (37-41)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
  • get_finished_spans (40-43)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (2)
  • AnthropicAsyncStream (300-451)
  • AnthropicStream (149-297)
🪛 Ruff (0.12.2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py

163-163: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


195-195: Do not call getattr with a constant attribute value. It is not any safer than normal property access.

Replace getattr with attribute access

(B009)


288-288: Do not catch blind exception: Exception

(BLE001)


314-314: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)


349-349: Do not call getattr with a constant attribute value. It is not any safer than normal property access.

Replace getattr with attribute access

(B009)


442-442: Do not catch blind exception: Exception

(BLE001)

packages/opentelemetry-instrumentation-anthropic/tests/test_messages.py

2848-2848: Unused function argument: instrument_legacy

(ARG001)


2848-2848: Unused function argument: log_exporter

(ARG001)


2848-2848: Unused function argument: reader

(ARG001)


2879-2879: Unused function argument: instrument_legacy

(ARG001)


2905-2905: Unused function argument: instrument_legacy

(ARG001)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Test Packages (3.11)
  • GitHub Check: Test Packages (3.12)
  • GitHub Check: Build Packages (3.11)
  • GitHub Check: Test Packages (3.10)
  • GitHub Check: Lint
🔇 Additional comments (2)
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py (1)

481-497: Overall approach LGTM.

Switching to ObjectProxy-based wrappers to preserve helper methods while instrumenting is a solid fix for #3371. Tests cover the key helpers.

Also applies to: 529-545

packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py (1)

20-21: Imports updated to class-based stream wrappers — aligns with goal to preserve helper methods.

Using AnthropicStream/AnthropicAsyncStream from streaming.py matches the PR intent and restores get_final_message/text_stream/until_done without sacrificing instrumentation.

Comment on lines +152 to +165
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: dict = {},
):
super().__init__(response)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid mutable default for kwargs and copy on assignment.

Defaulting kwargs to {} is unsafe; also store a defensive copy.

-    def __init__(
+    def __init__(
         self,
         span,
         response,
         instance,
         start_time,
         token_histogram: Histogram = None,
         choice_counter: Counter = None,
         duration_histogram: Histogram = None,
         exception_counter: Counter = None,
         event_logger: Optional[EventLogger] = None,
-        kwargs: dict = {},
+        kwargs: Optional[dict] = None,
     ):
@@
-        self._kwargs = kwargs
+        self._kwargs = dict(kwargs or {})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: dict = {},
):
super().__init__(response)
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: Optional[dict] = None,
):
super().__init__(response)
self._kwargs = dict(kwargs or {})
🧰 Tools
🪛 Ruff (0.12.2)

163-163: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 152 to 165, the __init__ uses a mutable default kwargs = {} and
assigns it directly; change the signature to default kwargs to None and inside
the constructor set self.kwargs = dict(kwargs) if kwargs is not None else {}, so
you avoid shared mutable state and store a defensive copy of the passed mapping.

Comment on lines +227 to +236
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
self._span.end()
self._instrumentation_completed = True
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard span end on error.

If self._span is None, calling end() will raise.

-                self._span.end()
+                if self._span:
+                    self._span.end()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
self._span.end()
self._instrumentation_completed = True
raise
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
if self._span:
self._span.end()
self._instrumentation_completed = True
raise
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 227 to 236, the code calls self._span.end() without ensuring
self._span is not None which can raise; add a guard so you only call set_status
and end if self._span exists (and optionally is_recording for set_status) — e.g.
check if self._span before calling self._span.set_status(...) and
self._span.end() to avoid AttributeError, then proceed to set
self._instrumentation_completed = True and re-raise.

Comment on lines +251 to 256
if self._duration_histogram:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle missing start_time when recording duration.

Avoid TypeError if start_time is None.

-        if self._duration_histogram:
-            duration = time.time() - self._start_time
+        if self._duration_histogram and self._start_time is not None:
+            duration = time.time() - self._start_time
             self._duration_histogram.record(
                 duration,
                 attributes=metric_attributes,
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self._duration_histogram:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
if self._duration_histogram and self._start_time is not None:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 251 to 256, the code assumes self._start_time exists and will raise
a TypeError if it's None; guard the duration recording by checking that
self._start_time is not None before computing time.time() - self._start_time and
calling self._duration_histogram.record, and if start_time is missing simply
skip recording (or set a sensible default/start_time earlier) to avoid the
TypeError.

Comment on lines 299 to +316

metric_attributes = shared_metrics_attributes(complete_response)
class AnthropicAsyncStream(ObjectProxy):
"""Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods"""

if duration_histogram:
duration = time.time() - start_time
duration_histogram.record(
duration,
attributes=metric_attributes,
)
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: dict = {},
):
super().__init__(response)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Mirror kwargs default/copy fix in async wrapper.

Apply the same mutable-default and copy fix to AnthropicAsyncStream.

-        kwargs: dict = {},
+        kwargs: Optional[dict] = None,
@@
-        self._kwargs = kwargs
+        self._kwargs = dict(kwargs or {})
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
metric_attributes = shared_metrics_attributes(complete_response)
class AnthropicAsyncStream(ObjectProxy):
"""Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods"""
if duration_histogram:
duration = time.time() - start_time
duration_histogram.record(
duration,
attributes=metric_attributes,
)
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: dict = {},
):
super().__init__(response)
class AnthropicAsyncStream(ObjectProxy):
"""Wrapper for Anthropic async streaming responses that handles instrumentation while preserving helper methods"""
def __init__(
self,
span,
response,
instance,
start_time,
token_histogram: Histogram = None,
choice_counter: Counter = None,
duration_histogram: Histogram = None,
exception_counter: Counter = None,
event_logger: Optional[EventLogger] = None,
kwargs: Optional[dict] = None,
):
super().__init__(response)
self._kwargs = dict(kwargs or {})
🧰 Tools
🪛 Ruff (0.12.2)

314-314: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 299 to 316, the AnthropicAsyncStream constructor uses a mutable
default for kwargs (kwargs: dict = {}), which can lead to shared state across
instances; change the default to None (kwargs: dict | None = None) and inside
__init__ set self._kwargs = dict(kwargs) if kwargs is not None else {} (or
kwargs.copy()) to ensure each instance gets its own copy and avoid mutating a
shared dict.

Comment on lines +380 to +390
except Exception as e:
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
self._span.end()
self._instrumentation_completed = True
raise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Guard span end on error (async).

-                self._span.end()
+                if self._span:
+                    self._span.end()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except Exception as e:
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
self._span.end()
self._instrumentation_completed = True
raise
except Exception as e:
# Handle errors during streaming
if not self._instrumentation_completed:
attributes = error_metrics_attributes(e)
if self._exception_counter:
self._exception_counter.add(1, attributes=attributes)
if self._span and self._span.is_recording():
self._span.set_status(Status(StatusCode.ERROR, str(e)))
if self._span:
self._span.end()
self._instrumentation_completed = True
raise
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 380-390, the exception handler calls self._span.end() without
guarding that self._span exists or whether it is already ended (race in async
code). Change the logic to first check self._span is not None, then if
self._span.is_recording() set the error status, and only then call
self._span.end(); also ensure you respect self._instrumentation_completed to
avoid double-ending (set it before or immediately after ending as appropriate).

Comment on lines +405 to 410
if self._duration_histogram:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Handle missing start_time when recording duration (async).

-        if self._duration_histogram:
-            duration = time.time() - self._start_time
+        if self._duration_histogram and self._start_time is not None:
+            duration = time.time() - self._start_time
             self._duration_histogram.record(
                 duration,
                 attributes=metric_attributes,
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if self._duration_histogram:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
if self._duration_histogram and self._start_time is not None:
duration = time.time() - self._start_time
self._duration_histogram.record(
duration,
attributes=metric_attributes,
)
🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/streaming.py
around lines 405 to 410, the code records a duration using self._start_time but
does not guard against self._start_time being missing or None (possible in async
flows); update the block to check that hasattr(self, "_start_time") and
self._start_time is not None before computing duration and calling
self._duration_histogram.record, and if start_time is missing simply skip
recording (or record a sentinel if desired) to avoid raising an exception;
optionally add a short debug log when skipping so the behavior is visible in
traces.

Comment on lines +114 to +116
- 617d109c-a187-4902-889d-689223d134aa
anthropic-ratelimit-input-tokens-limit:
- '400000'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Redact organization and request IDs in cassette.

Same concern as other cassettes; please mask anthropic-organization-id and request-id.

-      anthropic-organization-id:
-      - 617d109c-a187-4902-889d-689223d134aa
+      anthropic-organization-id:
+      - "<REDACTED>"
@@
-      request-id:
-      - req_011CT6JRYAtJjioFin4pYtvp
+      request-id:
+      - "<REDACTED>"

Also applies to: 142-143

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_streaming_helper_methods_legacy.yaml
around lines 114-116 (and also apply same change at 142-143), redact the real
anthropic-organization-id and request-id values by replacing them with a
placeholder (e.g. "REDACTED_ANON_ORG_ID" and "REDACTED_REQUEST_ID") so sensitive
IDs are not stored in the cassette; update both occurrences consistently and
ensure formatting stays valid YAML.

Comment on lines +102 to +104
anthropic-organization-id:
- 617d109c-a187-4902-889d-689223d134aa
anthropic-ratelimit-input-tokens-limit:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Scrub org/request identifiers from cassette headers.

Redact anthropic-organization-id and request-id to avoid leaking potentially sensitive metadata in VCR fixtures.

Apply:

-      anthropic-organization-id:
-      - 617d109c-a187-4902-889d-689223d134aa
+      anthropic-organization-id:
+      - "<REDACTED>"
@@
-      request-id:
-      - req_011CT6JRcKB2ZPLaPAXKZ4Wa
+      request-id:
+      - "<REDACTED>"

Optionally enforce via VCR config (outside this file):

# tests/conftest.py
import vcr
vcr = vcr.VCR(
    filter_headers=["authorization", "x-api-key", "anthropic-organization-id", "request-id"],
)

Also applies to: 130-132

🤖 Prompt for AI Agents
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_sync_streaming_helper_methods_legacy.yaml
lines ~102-104 and ~130-132: scrub the cassette headers by replacing the actual
anthropic-organization-id and request-id values with a redaction placeholder
(e.g. "REDACTED" or "<REDACTED>") so sensitive identifiers are not stored in the
fixture; update both header occurrences (lines indicated) accordingly and ensure
the format matches existing cassette header structure; optionally add guidance
to tests/conftest.py to filter these headers via VCR config as shown in the
review comment.

Comment on lines +103 to +104
- 617d109c-a187-4902-889d-689223d134aa
anthropic-ratelimit-input-tokens-limit:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Redact organization and request IDs.

Please mask anthropic-organization-id and request-id.

-      anthropic-organization-id:
-      - 617d109c-a187-4902-889d-689223d134aa
+      anthropic-organization-id:
+      - "<REDACTED>"
@@
-      request-id:
-      - req_011CT6JsAYbhj2squ2Q8fn85
+      request-id:
+      - "<REDACTED>"

Also applies to: 130-131

🤖 Prompt for AI Agents
In
packages/opentelemetry-instrumentation-anthropic/tests/cassettes/test_messages/test_anthropic_text_stream_helper_method_legacy.yaml
around lines 103-104 (and also apply same change at lines 130-131), the cassette
currently contains plaintext sensitive IDs (anthropic-organization-id and
request-id); redact them by replacing the actual ID values with masked
placeholders (e.g., "<REDACTED_ORG_ID>" and "<REDACTED_REQUEST_ID>") or a fixed
token so tests remain deterministic, ensuring formatting and YAML structure are
preserved.

@nirga nirga merged commit 5b4bba9 into main Sep 14, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

🐛 Bug Report: AnthropicInstrumentor is not supporting get_final_message

3 participants