-
Notifications
You must be signed in to change notification settings - Fork 868
fix(agno): add streaming support for Agent.run() and Agent.arun() #3483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add comprehensive streaming instrumentation for Agno agents: Core Changes: - streaming.py: New AgnoStream and AgnoAsyncStream wrapper classes - Wraps streaming responses using ObjectProxy pattern - Collects events and finalizes spans after stream completion - Properly records metrics (tokens, duration) post-stream - Handles errors and ensures spans are ended correctly - __init__.py: Enhanced Agent wrappers for streaming detection - Detects stream=True parameter in run() and arun() - Returns stream wrappers for streaming mode - Maintains original context manager behavior for non-streaming - utils.py: Enhanced dont_throw decorator - Now properly handles both sync and async functions - Uses asyncio.iscoroutinefunction for detection - Preserves function metadata with @wraps Testing: - Added 4 streaming tests (2 sync + 2 async) - test_agent_run_streaming - test_agent_run_streaming_with_tools - test_agent_arun_streaming - test_agent_arun_streaming_with_tools - All 8 tests pass with VCR cassettes - Flake8 linting passes Sample Application: - agno_streaming_example.py: Demonstrates async streaming with tools 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
WalkthroughThis PR adds streaming support to the Agno instrumentation by introducing streaming-aware branches in Agent.run/arun that return AgnoStream/AgnoAsyncStream wrappers. It also extends the dont_throw decorator to support async functions, adds comprehensive streaming test cassettes and tests with tool integration, and includes a sample streaming example application. Changes
Sequence Diagram(s)sequenceDiagram
actor Caller
participant Agent
participant Span
participant Stream as Stream Wrapper
participant Response
Caller->>Agent: run(stream=True)
alt Streaming Path
Agent->>Span: start_as_current_span()
Span->>Agent: (span context)
Agent->>Response: wrapped_call()
Response-->>Agent: response object
Agent->>Stream: wrap(span, response)
Stream-->>Agent: AgnoStream instance
Agent-->>Caller: return stream
loop Per Event
Caller->>Stream: __iter__/__next__
Stream->>Response: iterate events
Response-->>Stream: event
Stream->>Stream: accumulate metrics
Stream-->>Caller: yield event
end
Stream->>Stream: extract final metrics
Stream->>Span: set attributes + end()
else Non-Streaming Path
Agent->>Span: start_as_current_span()
Span->>Agent: (span context)
Agent->>Response: wrapped_call()
Response-->>Agent: result
Agent->>Span: set attributes (model, tokens, etc.)
Agent->>Span: record duration histogram
Span->>Agent: end span
Agent-->>Caller: return result
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed everything up to e8a1065 in 2 minutes and 19 seconds. Click for details.
- Reviewed
1834lines of code in9files - Skipped
0files when reviewing. - Skipped posting
3draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py:70
- Draft comment:
Consider using time.monotonic() instead of time.time() for duration measurements to avoid issues with system clock changes. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 30% vs. threshold = 50% This is a new file being added, so technically everything in it is a "change". The comment is suggesting a code quality improvement - usingtime.monotonic()instead oftime.time()for duration measurements. This is a well-known best practice in Python. The suggestion is actionable and clear. However, I need to consider: 1) Is this obvious or unimportant? It's not obvious to everyone, and it could matter in production systems. 2) Is it a clear code change required? Yes, it's a specific, actionable improvement. 3) The rules say "Comments that suggest code quality refactors are good! But only if they are actionable and clear." This fits that criteria. The comment is specific, actionable, and technically correct. The comment might be considered nitpicky since system clock changes are rare in practice, and this might work fine withtime.time(). Also, without seeing howstart_timeis initialized (it's passed in as a parameter), I can't be 100% sure both the start and end measurements are using the same clock source - if the caller is usingtime.time()to setstart_time, then both need to be changed together, which requires cross-file changes that I can't verify. While system clock changes are rare, usingtime.monotonic()for duration measurements is a Python best practice and the comment is actionable. However, the critique raises a valid concern - I cannot see wherestart_timeis set (it's passed as a parameter to__init__), so I cannot verify if changing just this line would be correct without also changing wherestart_timeis initialized. This could require changes in other files, which violates the "ignore cross-file issues" rule. Without seeing the full context, this comment might be incomplete or misleading. The comment is technically correct and suggests a good practice, but it may require cross-file changes that I cannot verify. Since I cannot see wherestart_timeis initialized and both the start and end time measurements need to use the same clock source, I cannot be certain this is a complete or correct suggestion without more context.
2. packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py:10
- Draft comment:
The dont_throw decorator suppresses exceptions; ensure this silent failure is intentional and well-documented for debugging scenarios. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =0%<= threshold50%The comment is asking the author to ensure that the silent failure is intentional and well-documented. This falls under asking the author to confirm their intention and ensure behavior, which violates the rules.
3. packages/sample-app/sample_app/agno_streaming_example.py:15
- Draft comment:
Sample app code is clear; note that the asynchronous streaming events are printed for demo purposes. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_s363cgOgcZBTm8oM
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
|
|
||
| return event | ||
|
|
||
| def _complete_instrumentation(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _complete_instrumentation methods in both AgnoAsyncStream and AgnoStream have similar logic. Consider refactoring to reduce duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (9)
packages/sample-app/sample_app/agno_streaming_example.py (1)
7-7: Consider moving initialization inside the main block.
Traceloop.init()at module level runs on import, which can cause issues when the module is imported for testing or other purposes. Consider moving it inside theif __name__ == "__main__"block for better isolation.-Traceloop.init(app_name="agno_streaming_example") - - def get_weather(location: str) -> str:Then in the main block:
if __name__ == "__main__": Traceloop.init(app_name="agno_streaming_example") asyncio.run(test_streaming())packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
19-37: Unused constructor parameters.
instanceis stored in_self_instance(line 31) andtoken_histogramis stored in_self_token_histogram(line 34), but neither is used anywhere in the class. Consider removing these if they're not needed, or document their intended future use.def __init__( self, span, response, - instance, start_time, duration_histogram: Histogram = None, - token_histogram: Histogram = None, ): super().__init__(response) self._self_span = span - self._self_instance = instance self._self_start_time = start_time self._self_duration_histogram = duration_histogram - self._self_token_histogram = token_histogram self._self_events = [] self._self_final_result = None self._self_instrumentation_completed = FalseNote: If these parameters are intentionally kept for future use, please add a comment indicating so.
119-219: Consider extracting shared instrumentation logic to reduce duplication.
AgnoStream._complete_instrumentationis nearly identical toAgnoAsyncStream._complete_instrumentation. Consider extracting the shared logic into a helper function or mixin to improve maintainability.Example helper approach:
def _finalize_instrumentation( span, final_result, start_time, duration_histogram, instrumentation_completed_flag_setter ): """Shared instrumentation completion logic.""" # Common logic here...This would reduce the duplication between lines 65-116 and 168-219.
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml (1)
96-100: Consider scrubbing session cookies from cassettes.The
Set-Cookieheaders contain Cloudflare session tokens (__cf_bm,_cfuvid). While these are likely ephemeral and not directly exploitable, per coding guidelines for VCR cassettes, consider scrubbing or replacing these with placeholder values to follow best practices for not committing potentially sensitive data.Verify that the VCR recording configuration includes appropriate filters for sensitive headers. You might configure VCR to filter these automatically:
@vcr.use_cassette(..., filter_headers=['set-cookie', 'authorization'])packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml (1)
96-100: Consider scrubbing session cookies from cassettes.Same observation as the async streaming cassette - the
Set-Cookieheaders contain Cloudflare session tokens. Consider configuring VCR to filter these headers automatically for consistency with security best practices.packages/opentelemetry-instrumentation-agno/tests/test_agent.py (2)
174-175: Consider adding a defensive check before indexing.The list comprehension
[s for s in spans if "agent" in s.name][-1]will raiseIndexErrorif no agent spans are found, making test failures harder to diagnose.- agent_span = [s for s in spans if "agent" in s.name][-1] + agent_spans = [s for s in spans if "agent" in s.name] + assert agent_spans, "Expected at least one agent span" + agent_span = agent_spans[-1]
239-240: Same IndexError risk as the sync variant.Apply the same defensive check pattern here for consistency.
- agent_span = [s for s in spans if "agent" in s.name][-1] + agent_spans = [s for s in spans if "agent" in s.name] + assert agent_spans, "Expected at least one agent span" + agent_span = agent_spans[-1]packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (2)
185-187: Moveimport timeto module level.The
timeimport is repeated inside function bodies at lines 185, 234, 334, and 384. While Python caches imports, placing it at module level is cleaner and avoids repeated dictionary lookups.Add at module level with other imports:
import timeThen remove the local imports inside the methods.
163-183: Consider extracting common span attribute setup.The attribute-setting logic for
GEN_AI_SYSTEM,TRACELOOP_SPAN_KIND, agent name, model, and input message is duplicated between streaming (lines 163-183) and non-streaming (lines 213-232) branches. The same pattern repeats in_AgentARunWrapper. A helper method could reduce this duplication.def _set_agent_span_attributes(span, instance, args): """Set common span attributes for agent runs.""" span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno") span.set_attribute( SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.AGENT.value, ) if hasattr(instance, "name"): span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name) if hasattr(instance, "model") and instance.model: model_name = getattr( instance.model, "id", getattr(instance.model, "name", "unknown") ) span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name) if args and should_send_prompts(): span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, str(args[0]))Also applies to: 213-232
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (9)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py(5 hunks)packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py(1 hunks)packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py(1 hunks)packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml(1 hunks)packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yaml(1 hunks)packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml(1 hunks)packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yaml(1 hunks)packages/opentelemetry-instrumentation-agno/tests/test_agent.py(1 hunks)packages/sample-app/sample_app/agno_streaming_example.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/cassettes/**/*.{yaml,yml,json}
📄 CodeRabbit inference engine (CLAUDE.md)
Never commit secrets or PII in VCR cassettes; scrub sensitive data
Files:
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yamlpackages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yamlpackages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yamlpackages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/opentelemetry-instrumentation-agno/tests/test_agent.pypackages/sample-app/sample_app/agno_streaming_example.pypackages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pypackages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.pypackages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py
🧬 Code graph analysis (4)
packages/opentelemetry-instrumentation-agno/tests/test_agent.py (2)
packages/traceloop-sdk/traceloop/sdk/utils/in_memory_span_exporter.py (1)
get_finished_spans(40-43)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (1)
SpanAttributes(64-245)
packages/sample-app/sample_app/agno_streaming_example.py (2)
packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py (1)
agent(48-58)packages/traceloop-sdk/traceloop/sdk/__init__.py (2)
Traceloop(36-267)init(48-198)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (1)
should_send_prompts(43-46)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (2)
SpanAttributes(64-245)TraceloopSpanKindValues(285-290)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (3)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
AgnoAsyncStream(16-116)AgnoStream(119-219)packages/opentelemetry-semantic-conventions-ai/opentelemetry/semconv_ai/__init__.py (2)
SpanAttributes(64-245)TraceloopSpanKindValues(285-290)packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (1)
should_send_prompts(43-46)
🪛 Ruff (0.14.6)
packages/opentelemetry-instrumentation-agno/tests/test_agent.py
123-123: Unused function argument: instrument
(ARG001)
123-123: Unused function argument: reader
(ARG001)
151-151: Unused function argument: instrument
(ARG001)
151-151: Unused function argument: reader
(ARG001)
187-187: Unused function argument: instrument
(ARG001)
187-187: Unused function argument: reader
(ARG001)
216-216: Unused function argument: instrument
(ARG001)
216-216: Unused function argument: reader
(ARG001)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py
111-111: Do not catch blind exception: Exception
(BLE001)
214-214: Do not catch blind exception: Exception
(BLE001)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py
17-17: Do not catch blind exception: Exception
(BLE001)
25-25: Do not catch blind exception: Exception
(BLE001)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py
277-277: Consider moving this statement to an else block
(TRY300)
427-427: Consider moving this statement to an else block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build Packages (3.11)
- GitHub Check: Test Packages (3.11)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Lint
🔇 Additional comments (12)
packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/utils.py (2)
1-4: LGTM!The new imports for
wrapsandasyncioare appropriate for extending thedont_throwdecorator to support async functions while preserving function metadata.
10-27: LGTM!The
dont_throwdecorator correctly handles both sync and async functions. The use ofasyncio.iscoroutinefunctionfor detection and@wrapsfor metadata preservation is appropriate. The broadExceptioncatch is intentional for this suppression decorator pattern.packages/sample-app/sample_app/agno_streaming_example.py (1)
15-24: LGTM!The agent setup correctly retrieves the API key from environment variables, following the coding guidelines. The streaming example demonstrates proper usage of
arunwithstream=True.packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.py (2)
42-63: LGTM!The async iteration correctly handles stream completion (
StopAsyncIteration) and errors, with proper span status updates and exception recording. The guard against double-finalization is appropriate.
65-116: LGTM!The instrumentation completion logic correctly extracts metrics, sets span attributes conditionally, and ensures the span is properly ended in the
finallyblock. The defensive exception handling with warning logging is appropriate for instrumentation code that should not break the application flow.packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming.yaml (1)
1-184: LGTM!The cassette properly captures the streaming interaction and telemetry flow. API keys appear to be filtered, and the response data includes obfuscation markers suggesting appropriate scrubbing was applied during recording.
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming.yaml (1)
1-184: LGTM!The sync streaming cassette properly captures the expected HTTP interactions for testing. The structure mirrors the async version appropriately, with distinct agent identifiers for test differentiation.
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_arun_streaming_with_tools.yaml (1)
1-326: VCR cassette properly sanitized.The cassette correctly captures the streaming flow with tool calls (multiply function). API keys are scrubbed (no Authorization headers present), and the recorded data appears appropriate for test fixtures.
packages/opentelemetry-instrumentation-agno/tests/cassettes/test_agent/test_agent_run_streaming_with_tools.yaml (1)
1-326: VCR cassette properly sanitized for sync streaming tests.The cassette correctly records the synchronous streaming flow with tool usage. Sensitive data (API keys) is properly scrubbed, and the structure matches the expected test scenario.
packages/opentelemetry-instrumentation-agno/tests/test_agent.py (1)
122-148: Streaming test implementation looks correct.The test properly validates streaming behavior by collecting events and verifying span attributes. The
instrumentandreaderfixture arguments flagged by static analysis are intentional -instrumentis a setup fixture with side effects (initializes instrumentation) even though it's not directly referenced in the test body.packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py (2)
294-347: Async streaming wrapper implementation looks correct.The
__call__method correctly returnsAgnoAsyncStreamsynchronously for streaming mode. The wrapped async iterator is passed to the wrapper, which handles async iteration internally. This pattern assumesarun(stream=True)returns an async iterable directly rather than a coroutine that yields the iterator.
354-434: Non-streaming async wrapper correctly usesasync_wrapperpattern.The pattern of defining a local
async_wrapper()coroutine and returning it allows the synchronous__call__method to set up the async execution context properly. Theawait wrapped(*args, **kwargs)on line 388 is correct for non-streaming async execution.
| if is_streaming: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| span = self._tracer.start_span( | ||
| span_name, | ||
| kind=SpanKind.CLIENT, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check streaming.py for span/context handling
rg -nP --type=py "start_span|set_current|attach|context|use_span|Token" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
#!/bin/bash
# Check if streaming.py imports or uses any context-related OpenTelemetry modules
rg -nP --type=py "from.*context|import.*context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
#!/bin/bash
# Look at the full streaming.py file to understand the structure
wc -l packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry
Length of output: 160
🏁 Script executed:
#!/bin/bash
# Check how tool spans are created within the streaming context
rg -nP --type=py "tool.*span|_create_tool|wrap_tool" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py -A 5 | head -50Repository: traceloop/openllmetry
Length of output: 47
🏁 Script executed:
#!/bin/bash
# Read streaming.py to understand the implementation
cat -n packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/streaming.pyRepository: traceloop/openllmetry
Length of output: 9702
🏁 Script executed:
#!/bin/bash
# Also check the imports in __init__.py to understand span management
head -50 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -nRepository: traceloop/openllmetry
Length of output: 2160
🏁 Script executed:
#!/bin/bash
# Look for tool execution in the streaming branch
sed -n '155,180p' packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py | cat -nRepository: traceloop/openllmetry
Length of output: 1272
🏁 Script executed:
#!/bin/bash
# Check tool wrapper implementation
head -80 packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py | cat -nRepository: traceloop/openllmetry
Length of output: 4007
🏁 Script executed:
#!/bin/bash
# Look for start_span calls in tool wrappers
rg -n "start_span|get_span|context" packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/_tool_wrappers.py -A 3 | head -60Repository: traceloop/openllmetry
Length of output: 970
Span context propagation is broken for tool calls in streaming mode.
The streaming branch creates the agent span with start_span (line 158) but never sets it as the current context. Tool wrappers (_tool_wrappers.py line 34) use start_as_current_span for tool spans, which relies on the current OpenTelemetry context to establish parent-child relationships. Since the agent span is not set as the current context, tool spans will not be correctly parented to the agent span.
The non-streaming branch uses start_as_current_span (line 208), which properly establishes context. To fix this in streaming mode, attach the agent span to the context before wrapping the response. Use get_current_span() and token-based context attachment or wrap the stream iteration with the span context.
| if is_streaming: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| span = self._tracer.start_span( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not start_as_current_span?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@doronkopit5 bwcause it will be ended outside of this method (only when the stream ends)
| if is_streaming: | ||
| span_name = f"{getattr(instance, 'name', 'unknown')}.agent" | ||
|
|
||
| span = self._tracer.start_span( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question
Summary
Adds comprehensive streaming instrumentation support for Agno agents, enabling proper OpenTelemetry tracing for both sync and async streaming operations.
Changes
Core Implementation
New File:
streaming.pyAgnoStream: Wrapper for synchronous streaming responsesAgnoAsyncStream: Wrapper for asynchronous streaming responsesObjectProxypattern to transparently wrap streamsModified:
__init__.py_AgentRunWrapperto detectstream=Trueparameter_AgentARunWrapperto detectstream=Trueparameter@dont_throwdecorator to_AgentARunWrapperfor consistencyModified:
utils.pydont_throwdecorator to properly handle async functionsasyncio.iscoroutinefunctionfor async detection@wrapsTesting
Added 4 comprehensive streaming tests:
test_agent_run_streaming- Basic sync streamingtest_agent_run_streaming_with_tools- Sync streaming with tool callstest_agent_arun_streaming- Basic async streamingtest_agent_arun_streaming_with_tools- Async streaming with tool callsAll tests include VCR cassettes for reproducibility.
Sample Application
agno_streaming_example.py: Demonstrates async streaming with a weather agentTesting
Test Plan
🤖 Generated with Claude Code
Important
Adds streaming support for Agno agents with OpenTelemetry tracing, including new classes for handling streaming responses and comprehensive tests.
AgnoStreamandAgnoAsyncStreamclasses instreaming.pyfor handling sync and async streaming responses._AgentRunWrapperand_AgentARunWrapperin__init__.pyto detectstream=Trueand handle streaming with new classes.dont_throwdecorator inutils.pyto support async functions usingasyncio.iscoroutinefunction.test_agent_run_streaming,test_agent_run_streaming_with_tools,test_agent_arun_streaming, andtest_agent_arun_streaming_with_toolsintest_agent.py.agno_streaming_example.pydemonstrating async streaming with a weather agent.This description was created by
for e8a1065. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit
New Features
Tests
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.