Skip to content

Conversation

@nirga
Copy link
Member

@nirga nirga commented Dec 1, 2025

Summary

Adds comprehensive streaming instrumentation support for Agno agents, enabling proper OpenTelemetry tracing for both sync and async streaming operations.

Changes

Core Implementation

New File: streaming.py

  • AgnoStream: Wrapper for synchronous streaming responses
  • AgnoAsyncStream: Wrapper for asynchronous streaming responses
  • Both classes use ObjectProxy pattern to transparently wrap streams
  • Collect events during streaming and finalize instrumentation on completion
  • Properly record metrics (tokens, duration) after stream ends
  • Handle errors gracefully with proper span status updates

Modified: __init__.py

  • Enhanced _AgentRunWrapper to detect stream=True parameter
  • Enhanced _AgentARunWrapper to detect stream=True parameter
  • Streaming mode: Creates span manually and returns stream wrapper
  • Non-streaming mode: Maintains original context manager behavior
  • Added @dont_throw decorator to _AgentARunWrapper for consistency

Modified: utils.py

  • Enhanced dont_throw decorator to properly handle async functions
  • Uses asyncio.iscoroutinefunction for async detection
  • Preserves function metadata with @wraps

Testing

Added 4 comprehensive streaming tests:

  • test_agent_run_streaming - Basic sync streaming
  • test_agent_run_streaming_with_tools - Sync streaming with tool calls
  • test_agent_arun_streaming - Basic async streaming
  • test_agent_arun_streaming_with_tools - Async streaming with tool calls

All tests include VCR cassettes for reproducibility.

Sample Application

  • agno_streaming_example.py: Demonstrates async streaming with a weather agent

Testing

  • ✅ All 8 tests pass (4 original + 4 new streaming tests)
  • ✅ Flake8 linting passes with no issues
  • ✅ VCR cassettes recorded for all streaming scenarios

Test Plan

  • Sync streaming without tools works correctly
  • Sync streaming with tools works correctly
  • Async streaming without tools works correctly
  • Async streaming with tools works correctly
  • Spans are created with correct attributes
  • Metrics are recorded properly after stream completion
  • Error handling works correctly
  • All existing tests still pass

🤖 Generated with Claude Code


Important

Adds streaming support for Agno agents with OpenTelemetry tracing, including new classes for handling streaming responses and comprehensive tests.

  • Behavior:
    • Adds AgnoStream and AgnoAsyncStream classes in streaming.py for handling sync and async streaming responses.
    • Updates _AgentRunWrapper and _AgentARunWrapper in __init__.py to detect stream=True and handle streaming with new classes.
    • Non-streaming mode retains original context manager behavior.
  • Utilities:
    • Enhances dont_throw decorator in utils.py to support async functions using asyncio.iscoroutinefunction.
  • Testing:
    • Adds tests test_agent_run_streaming, test_agent_run_streaming_with_tools, test_agent_arun_streaming, and test_agent_arun_streaming_with_tools in test_agent.py.
    • Includes VCR cassettes for streaming scenarios.
  • Sample Application:
    • Adds agno_streaming_example.py demonstrating async streaming with a weather agent.

This description was created by Ellipsis for e8a1065. You can customize this summary. It will automatically update as commits are pushed.

Summary by CodeRabbit

  • New Features

    • Added streaming support for agent executions with comprehensive telemetry tracking, including token usage and run metrics.
    • Enhanced instrumentation with improved attribute population for agent operations.
  • Tests

    • Added test coverage for streaming scenarios with and without tool integration.
  • Documentation

    • Added example application demonstrating streaming agent workflows with instrumentation.

✏️ Tip: You can customize this high-level summary in your review settings.

Add comprehensive streaming instrumentation for Agno agents:

Core Changes:
- streaming.py: New AgnoStream and AgnoAsyncStream wrapper classes
  - Wraps streaming responses using ObjectProxy pattern
  - Collects events and finalizes spans after stream completion
  - Properly records metrics (tokens, duration) post-stream
  - Handles errors and ensures spans are ended correctly

- __init__.py: Enhanced Agent wrappers for streaming detection
  - Detects stream=True parameter in run() and arun()
  - Returns stream wrappers for streaming mode
  - Maintains original context manager behavior for non-streaming

- utils.py: Enhanced dont_throw decorator
  - Now properly handles both sync and async functions
  - Uses asyncio.iscoroutinefunction for detection
  - Preserves function metadata with @wraps

Testing:
- Added 4 streaming tests (2 sync + 2 async)
  - test_agent_run_streaming
  - test_agent_run_streaming_with_tools
  - test_agent_arun_streaming
  - test_agent_arun_streaming_with_tools
- All 8 tests pass with VCR cassettes
- Flake8 linting passes

Sample Application:
- agno_streaming_example.py: Demonstrates async streaming with tools

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Dec 1, 2025

Walkthrough

This PR adds streaming support to the Agno instrumentation by introducing streaming-aware branches in Agent.run/arun that return AgnoStream/AgnoAsyncStream wrappers. It also extends the dont_throw decorator to support async functions, adds comprehensive streaming test cassettes and tests with tool integration, and includes a sample streaming example application.

Changes

Cohort / File(s) Summary
Streaming instrumentation core
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py
Adds streaming-aware branches in Agent.run/arun wrappers that detect stream parameter and return AgnoStream/AgnoAsyncStream wrappers instead of awaiting single-span contexts. Non-streaming paths enhanced with richer attribute population including GEN_AI_AGENT_NAME, GEN_AI_REQUEST_MODEL, TRACELOOP_ENTITY_INPUT, token metrics, and duration histograms. Both paths implement explicit exception handling and span finalization.
Streaming wrapper classes
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py
Introduces AgnoStream and AgnoAsyncStream proxy wrappers that manage span lifecycle during streaming iteration, accumulate events, extract final metrics (tokens, content, run_id), set instrumentation attributes, and handle error cases with proper span cleanup. Both classes use shared state tracking to guard repeated finalization.
Async decorator support
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py
Extends dont_throw decorator to support both sync and async functions by detecting coroutine functions and returning appropriate wrapper implementations with consistent error logging.
Test cassettes
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml, test_agent_run_streaming_with_tools.yaml, test_agent_arun_streaming.yaml, test_agent_arun_streaming_with_tools.yaml
Adds VCR cassettes capturing OpenAI streaming chat completion requests/responses and telemetry run creation payloads for sync and async streaming flows, with and without tool integration.
Streaming integration tests
packages/opentelemetry-instrumentation-agno/tests/test_agent.py
Adds four new test methods (test_agent_run_streaming, test_agent_run_streaming_with_tools, test_agent_arun_streaming, test_agent_arun_streaming_with_tools) verifying span attributes, entity inputs, token tracking, and tool span creation in both sync and async streaming contexts.
Sample streaming example
packages/sample-app/sample_app/agno_streaming_example.py
New standalone async example demonstrating streaming instrumentation with a weather agent, including Traceloop initialization, tool definition, and event iteration over arun responses.

Sequence Diagram(s)

sequenceDiagram
    actor Caller
    participant Agent
    participant Span
    participant Stream as Stream Wrapper
    participant Response

    Caller->>Agent: run(stream=True)
    alt Streaming Path
        Agent->>Span: start_as_current_span()
        Span->>Agent: (span context)
        Agent->>Response: wrapped_call()
        Response-->>Agent: response object
        Agent->>Stream: wrap(span, response)
        Stream-->>Agent: AgnoStream instance
        Agent-->>Caller: return stream
        loop Per Event
            Caller->>Stream: __iter__/__next__
            Stream->>Response: iterate events
            Response-->>Stream: event
            Stream->>Stream: accumulate metrics
            Stream-->>Caller: yield event
        end
        Stream->>Stream: extract final metrics
        Stream->>Span: set attributes + end()
    else Non-Streaming Path
        Agent->>Span: start_as_current_span()
        Span->>Agent: (span context)
        Agent->>Response: wrapped_call()
        Response-->>Agent: result
        Agent->>Span: set attributes (model, tokens, etc.)
        Agent->>Span: record duration histogram
        Span->>Agent: end span
        Agent-->>Caller: return result
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Core instrumentation logic: __init__.py contains branching span lifecycle logic with separate streaming and non-streaming paths, exception handling, and complex attribute population across both paths
  • Streaming wrapper state management: streaming.py implements dual sync/async wrappers with iteration, accumulation, finalization guards, and error recovery during streaming
  • Async decorator implementation: utils.py adds coroutine detection and dual wrapper strategies; requires careful review to ensure both paths maintain consistency
  • Test coverage heterogeneity: Multiple new tests covering sync/async and tool/non-tool combinations with cassette-based fixtures; test logic relatively straightforward but requires cassette validation
  • Path equivalence verification: Ensure streaming and non-streaming branches produce consistent instrumentation attributes and span structure

Possibly related PRs

Suggested reviewers

  • galkleinman
  • doronkopit5
  • dinmukhamedm

Poem

🐰 A rabbit hops through streaming threads,
Where spans now split to dual paths ahead,
One flows with data, chunk by chunk so bright,
The other waits—a simpler sight!
With tools and tokens, metrics rise,
Agno's instrumentation flies! 🌿✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main objective of the PR: adding streaming support for Agent.run() and Agent.arun() methods in the Agno instrumentation package.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch agno-bug

Comment @coderabbitai help to get the list of available commands and usage tips.

@nirga nirga changed the title feat(agno): add streaming support for Agent.run() and Agent.arun() fix(agno): add streaming support for Agent.run() and Agent.arun() Dec 1, 2025
Copy link
Contributor

@ellipsis-dev ellipsis-dev bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Changes requested ❌

Reviewed everything up to e8a1065 in 2 minutes and 19 seconds. Click for details.
  • Reviewed 1834 lines of code in 9 files
  • Skipped 0 files when reviewing.
  • Skipped posting 3 draft comments. View those below.
  • Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py:70
  • Draft comment:
    Consider using time.monotonic() instead of time.time() for duration measurements to avoid issues with system clock changes.
  • Reason this comment was not posted:
    Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 30% vs. threshold = 50% This is a new file being added, so technically everything in it is a "change". The comment is suggesting a code quality improvement - using time.monotonic() instead of time.time() for duration measurements. This is a well-known best practice in Python. The suggestion is actionable and clear. However, I need to consider: 1) Is this obvious or unimportant? It's not obvious to everyone, and it could matter in production systems. 2) Is it a clear code change required? Yes, it's a specific, actionable improvement. 3) The rules say "Comments that suggest code quality refactors are good! But only if they are actionable and clear." This fits that criteria. The comment is specific, actionable, and technically correct. The comment might be considered nitpicky since system clock changes are rare in practice, and this might work fine with time.time(). Also, without seeing how start_time is initialized (it's passed in as a parameter), I can't be 100% sure both the start and end measurements are using the same clock source - if the caller is using time.time() to set start_time, then both need to be changed together, which requires cross-file changes that I can't verify. While system clock changes are rare, using time.monotonic() for duration measurements is a Python best practice and the comment is actionable. However, the critique raises a valid concern - I cannot see where start_time is set (it's passed as a parameter to __init__), so I cannot verify if changing just this line would be correct without also changing where start_time is initialized. This could require changes in other files, which violates the "ignore cross-file issues" rule. Without seeing the full context, this comment might be incomplete or misleading. The comment is technically correct and suggests a good practice, but it may require cross-file changes that I cannot verify. Since I cannot see where start_time is initialized and both the start and end time measurements need to use the same clock source, I cannot be certain this is a complete or correct suggestion without more context.
2. packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py:10
  • Draft comment:
    The dont_throw decorator suppresses exceptions; ensure this silent failure is intentional and well-documented for debugging scenarios.
  • Reason this comment was not posted:
    Comment did not seem useful. Confidence is useful = 0% <= threshold 50% The comment is asking the author to ensure that the silent failure is intentional and well-documented. This falls under asking the author to confirm their intention and ensure behavior, which violates the rules.
3. packages/sample-app/sample_app/agno_streaming_example.py:15
  • Draft comment:
    Sample app code is clear; note that the asynchronous streaming events are printed for demo purposes.
  • Reason this comment was not posted:
    Confidence changes required: 0% <= threshold 50% None

Workflow ID: wflow_s363cgOgcZBTm8oM

You can customize Ellipsis by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.


return event

def _complete_instrumentation(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _complete_instrumentation methods in both AgnoAsyncStream and AgnoStream have similar logic. Consider refactoring to reduce duplication.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (9)
packages/sample-app/sample_app/agno_streaming_example.py (1)

7-7: Consider moving initialization inside the main block.

Traceloop.init() at module level runs on import, which can cause issues when the module is imported for testing or other purposes. Consider moving it inside the if __name__ == "__main__" block for better isolation.

-Traceloop.init(app_name="agno_streaming_example")
-
-
 def get_weather(location: str) -> str:

Then in the main block:

if __name__ == "__main__":
    Traceloop.init(app_name="agno_streaming_example")
    asyncio.run(test_streaming())
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)

19-37: Unused constructor parameters.

instance is stored in _self_instance (line 31) and token_histogram is stored in _self_token_histogram (line 34), but neither is used anywhere in the class. Consider removing these if they're not needed, or document their intended future use.

     def __init__(
         self,
         span,
         response,
-        instance,
         start_time,
         duration_histogram: Histogram = None,
-        token_histogram: Histogram = None,
     ):
         super().__init__(response)

         self._self_span = span
-        self._self_instance = instance
         self._self_start_time = start_time
         self._self_duration_histogram = duration_histogram
-        self._self_token_histogram = token_histogram
         self._self_events = []
         self._self_final_result = None
         self._self_instrumentation_completed = False

Note: If these parameters are intentionally kept for future use, please add a comment indicating so.


119-219: Consider extracting shared instrumentation logic to reduce duplication.

AgnoStream._complete_instrumentation is nearly identical to AgnoAsyncStream._complete_instrumentation. Consider extracting the shared logic into a helper function or mixin to improve maintainability.

Example helper approach:

def _finalize_instrumentation(
    span, final_result, start_time, duration_histogram, 
    instrumentation_completed_flag_setter
):
    """Shared instrumentation completion logic."""
    # Common logic here...

This would reduce the duplication between lines 65-116 and 168-219.

packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml (1)

96-100: Consider scrubbing session cookies from cassettes.

The Set-Cookie headers contain Cloudflare session tokens (__cf_bm, _cfuvid). While these are likely ephemeral and not directly exploitable, per coding guidelines for VCR cassettes, consider scrubbing or replacing these with placeholder values to follow best practices for not committing potentially sensitive data.

Verify that the VCR recording configuration includes appropriate filters for sensitive headers. You might configure VCR to filter these automatically:

@vcr.use_cassette(..., filter_headers=['set-cookie', 'authorization'])
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml (1)

96-100: Consider scrubbing session cookies from cassettes.

Same observation as the async streaming cassette - the Set-Cookie headers contain Cloudflare session tokens. Consider configuring VCR to filter these headers automatically for consistency with security best practices.

packages/opentelemetry-instrumentation-agno/tests/test_agent.py (2)

174-175: Consider adding a defensive check before indexing.

The list comprehension [s for s in spans if "agent" in s.name][-1] will raise IndexError if no agent spans are found, making test failures harder to diagnose.

-    agent_span = [s for s in spans if "agent" in s.name][-1]
+    agent_spans = [s for s in spans if "agent" in s.name]
+    assert agent_spans, "Expected at least one agent span"
+    agent_span = agent_spans[-1]

239-240: Same IndexError risk as the sync variant.

Apply the same defensive check pattern here for consistency.

-    agent_span = [s for s in spans if "agent" in s.name][-1]
+    agent_spans = [s for s in spans if "agent" in s.name]
+    assert agent_spans, "Expected at least one agent span"
+    agent_span = agent_spans[-1]
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (2)

185-187: Move import time to module level.

The time import is repeated inside function bodies at lines 185, 234, 334, and 384. While Python caches imports, placing it at module level is cleaner and avoids repeated dictionary lookups.

Add at module level with other imports:

import time

Then remove the local imports inside the methods.


163-183: Consider extracting common span attribute setup.

The attribute-setting logic for GEN_AI_SYSTEM, TRACELOOP_SPAN_KIND, agent name, model, and input message is duplicated between streaming (lines 163-183) and non-streaming (lines 213-232) branches. The same pattern repeats in _AgentARunWrapper. A helper method could reduce this duplication.

def _set_agent_span_attributes(span, instance, args):
    """Set common span attributes for agent runs."""
    span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno")
    span.set_attribute(
        SpanAttributes.TRACELOOP_SPAN_KIND,
        TraceloopSpanKindValues.AGENT.value,
    )
    if hasattr(instance, "name"):
        span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name)
    if hasattr(instance, "model") and instance.model:
        model_name = getattr(
            instance.model, "id", getattr(instance.model, "name", "unknown")
        )
        span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name)
    if args and should_send_prompts():
        span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, str(args[0]))

Also applies to: 213-232

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between bc97e56 and e8a1065.

📒 Files selected for processing (9)
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (5 hunks)
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (1 hunks)
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (1 hunks)
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yaml (1 hunks)
  • packages/opentelemetry-instrumentation-agno/tests/test_agent.py (1 hunks)
  • packages/sample-app/sample_app/agno_streaming_example.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/cassettes/**/*.{yaml,yml,json}

📄 CodeRabbit inference engine (CLAUDE.md)

Never commit secrets or PII in VCR cassettes; scrub sensitive data

Files:

  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yaml
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yaml
  • packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules

Files:

  • packages/opentelemetry-instrumentation-agno/tests/test_agent.py
  • packages/sample-app/sample_app/agno_streaming_example.py
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py
  • packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py
🧬 Code graph analysis (4)
packages/opentelemetry-instrumentation-agno/tests/test_agent.py (2)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
  • get_finished_spans (40-43)
packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
  • SpanAttributes (64-245)
packages/sample-app/sample_app/agno_streaming_example.py (2)
packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py (1)
  • agent (48-58)
packages/traceloop-sdk/traceloop/sdk/__init__.py (2)
  • Traceloop (36-267)
  • init (48-198)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (1)
  • should_send_prompts (43-46)
packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (2)
  • SpanAttributes (64-245)
  • TraceloopSpanKindValues (285-290)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (3)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
  • AgnoAsyncStream (16-116)
  • AgnoStream (119-219)
packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (2)
  • SpanAttributes (64-245)
  • TraceloopSpanKindValues (285-290)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (1)
  • should_send_prompts (43-46)
🪛 Ruff (0.14.6)
packages/opentelemetry-instrumentation-agno/tests/test_agent.py

123-123: Unused function argument: instrument

(ARG001)


123-123: Unused function argument: reader

(ARG001)


151-151: Unused function argument: instrument

(ARG001)


151-151: Unused function argument: reader

(ARG001)


187-187: Unused function argument: instrument

(ARG001)


187-187: Unused function argument: reader

(ARG001)


216-216: Unused function argument: instrument

(ARG001)


216-216: Unused function argument: reader

(ARG001)

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py

111-111: Do not catch blind exception: Exception

(BLE001)


214-214: Do not catch blind exception: Exception

(BLE001)

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py

17-17: Do not catch blind exception: Exception

(BLE001)


25-25: Do not catch blind exception: Exception

(BLE001)

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py

277-277: Consider moving this statement to an else block

(TRY300)


427-427: Consider moving this statement to an else block

(TRY300)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: Build Packages (3.11)
  • GitHub Check: Test Packages (3.11)
  • GitHub Check: Test Packages (3.10)
  • GitHub Check: Test Packages (3.12)
  • GitHub Check: Lint
🔇 Additional comments (12)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (2)

1-4: LGTM!

The new imports for wraps and asyncio are appropriate for extending the dont_throw decorator to support async functions while preserving function metadata.


10-27: LGTM!

The dont_throw decorator correctly handles both sync and async functions. The use of asyncio.iscoroutinefunction for detection and @wraps for metadata preservation is appropriate. The broad Exception catch is intentional for this suppression decorator pattern.

packages/sample-app/sample_app/agno_streaming_example.py (1)

15-24: LGTM!

The agent setup correctly retrieves the API key from environment variables, following the coding guidelines. The streaming example demonstrates proper usage of arun with stream=True.

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)

42-63: LGTM!

The async iteration correctly handles stream completion (StopAsyncIteration) and errors, with proper span status updates and exception recording. The guard against double-finalization is appropriate.


65-116: LGTM!

The instrumentation completion logic correctly extracts metrics, sets span attributes conditionally, and ensures the span is properly ended in the finally block. The defensive exception handling with warning logging is appropriate for instrumentation code that should not break the application flow.

packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml (1)

1-184: LGTM!

The cassette properly captures the streaming interaction and telemetry flow. API keys appear to be filtered, and the response data includes obfuscation markers suggesting appropriate scrubbing was applied during recording.

packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml (1)

1-184: LGTM!

The sync streaming cassette properly captures the expected HTTP interactions for testing. The structure mirrors the async version appropriately, with distinct agent identifiers for test differentiation.

packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yaml (1)

1-326: VCR cassette properly sanitized.

The cassette correctly captures the streaming flow with tool calls (multiply function). API keys are scrubbed (no Authorization headers present), and the recorded data appears appropriate for test fixtures.

packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yaml (1)

1-326: VCR cassette properly sanitized for sync streaming tests.

The cassette correctly records the synchronous streaming flow with tool usage. Sensitive data (API keys) is properly scrubbed, and the structure matches the expected test scenario.

packages/opentelemetry-instrumentation-agno/tests/test_agent.py (1)

122-148: Streaming test implementation looks correct.

The test properly validates streaming behavior by collecting events and verifying span attributes. The instrument and reader fixture arguments flagged by static analysis are intentional - instrument is a setup fixture with side effects (initializes instrumentation) even though it's not directly referenced in the test body.

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (2)

294-347: Async streaming wrapper implementation looks correct.

The __call__ method correctly returns AgnoAsyncStream synchronously for streaming mode. The wrapped async iterator is passed to the wrapper, which handles async iteration internally. This pattern assumes arun(stream=True) returns an async iterable directly rather than a coroutine that yields the iterator.


354-434: Non-streaming async wrapper correctly uses async_wrapper pattern.

The pattern of defining a local async_wrapper() coroutine and returning it allows the synchronous __call__ method to set up the async execution context properly. The await wrapped(*args, **kwargs) on line 388 is correct for non-streaming async execution.

Comment on lines +155 to +161
if is_streaming:
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"

span = self._tracer.start_span(
span_name,
kind=SpanKind.CLIENT,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check streaming.py for span/context handling
rg -nP --type=py "start_span|set_current|attach|context|use_span|Token" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Check if streaming.py imports or uses any context-related OpenTelemetry modules
rg -nP --type=py "from.*context|import.*context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Look at the full streaming.py file to understand the structure
wc -l packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py

Repository: traceloop/openllmetry

Length of output: 160


🏁 Script executed:

#!/bin/bash
# Check how tool spans are created within the streaming context
rg -nP --type=py "tool.*span|_create_tool|wrap_tool" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py -A 5 | head -50

Repository: traceloop/openllmetry

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Read streaming.py to understand the implementation
cat -n packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py

Repository: traceloop/openllmetry

Length of output: 9702


🏁 Script executed:

#!/bin/bash
# Also check the imports in __init__.py to understand span management
head -50 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -n

Repository: traceloop/openllmetry

Length of output: 2160


🏁 Script executed:

#!/bin/bash
# Look for tool execution in the streaming branch
sed -n '155,180p' packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -n

Repository: traceloop/openllmetry

Length of output: 1272


🏁 Script executed:

#!/bin/bash
# Check tool wrapper implementation
head -80 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py | cat -n

Repository: traceloop/openllmetry

Length of output: 4007


🏁 Script executed:

#!/bin/bash
# Look for start_span calls in tool wrappers
rg -n "start_span|get_span|context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py -A 3 | head -60

Repository: traceloop/openllmetry

Length of output: 970


Span context propagation is broken for tool calls in streaming mode.

The streaming branch creates the agent span with start_span (line 158) but never sets it as the current context. Tool wrappers (_tool_wrappers.py line 34) use start_as_current_span for tool spans, which relies on the current OpenTelemetry context to establish parent-child relationships. Since the agent span is not set as the current context, tool spans will not be correctly parented to the agent span.

The non-streaming branch uses start_as_current_span (line 208), which properly establishes context. To fix this in streaming mode, attach the agent span to the context before wrapping the response. Use get_current_span() and token-based context attachment or wrap the stream iteration with the span context.

if is_streaming:
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"

span = self._tracer.start_span(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not start_as_current_span?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@doronkopit5 bwcause it will be ended outside of this method (only when the stream ends)

if is_streaming:
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"

span = self._tracer.start_span(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

@nirga nirga merged commit f846e1c into main Dec 1, 2025
12 checks passed
@nirga nirga deleted the agno-bug branch December 1, 2025 18:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants