You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This PR adds event streaming support to the Chat class, allowing users to receive structured AgentEvent objects instead of just text chunks when streaming. This is a valuable feature that provides better visibility into agent execution.
Code Quality & Best Practices
✅ Strengths
Good Type Safety: Excellent use of @overload decorators to provide clear type hints for different parameter combinations
Comprehensive Testing: Added both smoke tests and unit tests with good coverage of edge cases
Backward Compatibility: The changes maintain backward compatibility - existing code using stream() or invoke() will continue to work
Consistent Error Handling: Added proper try-catch-finally blocks with cancellation support in pipeline steps
Code Reuse: Good abstraction with _invoke_streaming_events() method to avoid code duplication
⚠️ Issues & Recommendations
1. Missing time Import in steps.py
Location: src/upsonic/agent/pipeline/steps.py:54
The code uses time.time() but doesn't import time module:
start_time=time.time() # line 54, 236execution_time=time.time() -start_time# multiple locations
Fix: Add import time at the top of the file.
Impact: 🔴 Critical - This will cause runtime errors.
The updated execute() methods now include name and step_number fields in StepResult, but the original code didn't have these:
step_result=StepResult(
name=self.name,
step_number=step_number,
status=StepStatus.COMPLETED,
message="Skipped due to cache hit",
execution_time=time.time() -start_time
)
Concern: Need to verify that StepResult dataclass/class accepts these fields. If not, this will cause TypeErrors.
Recommendation: Review the StepResult class definition to ensure these fields are supported.
3. Duplicate Error Handling Logic
Location: src/upsonic/chat/chat.py:478-527
The _invoke_streaming_events() method duplicates almost all the error handling logic from _invoke_streaming(). The only difference is calling agent.astream() with events=True vs events=False.
Recommendation: Consider refactoring to reduce duplication:
def_invoke_streaming_base(
self,
task: Task,
response_start_time: float,
events: bool=False,
**kwargs: Any
) ->AsyncIterator[Any]:
# Common retry and error handling logic# Just pass events parameter to agent.astream()
This would eliminate ~80 lines of duplicate code.
4. Forced Streaming Behavior
Location: src/upsonic/chat/chat.py:430-432
# If events=True, force stream=Trueifeventsandnotstream:
stream=True
Issue: This silently overrides the user's explicit stream=False parameter without warning. While the behavior makes sense (events require streaming), it could be confusing.
Recommendation: Consider one of these approaches:
Document this behavior clearly in docstrings ✅ (already done)
Log a debug message when forcing stream=True
Alternatively, raise a ValueError if events=True and stream=False to make the requirement explicit
Question: Should events be collected even in non-streaming mode for later retrieval? Current implementation loses these events in non-streaming executions.
6. Missing Finalization in run_stream()
Location: src/upsonic/agent/pipeline/step.py:9-11
Added finalization logic in run_stream():
# Finalize step result (updates context.step_results and execution_stats)ifresult:
self._finalize_step_result(result, context)
Concern: This was previously missing - good catch! However, need to verify this doesn't cause double-finalization if execute_stream() already calls it.
Recommendation: Review execute_stream() and _finalize_step_result() implementations to ensure this doesn't cause issues.
Performance Considerations
✅ Good Practices
Async Generators: Proper use of async generators for streaming without blocking
No Unnecessary Buffering: Events are yielded immediately without buffering
Retry Logic: Exponential backoff with proper cleanup prevents resource leaks
⚠️ Potential Issues
Event Object Creation: Creating event objects for every text delta could have overhead for large responses. Consider measuring performance impact.
Generator Cleanup: Good cleanup in finally blocks, but verify aclose() is always called properly on early termination.
Security Concerns
✅ No Critical Issues Found
No SQL Injection: No database queries in this PR
No Command Injection: No shell execution
Input Validation: Proper validation of events parameter (boolean)
ℹ️ Notes
Error Message Truncation: Error messages are truncated to 500 chars in StepResult (line 175, 215, 315, 326) - this is good for preventing information leakage
Run Cancellation: Proper integration with run cancellation system
Unit Tests (test_chat.py): 564 lines of thorough unit tests
Initialization validation
Input normalization
Retry logic
Session state management
Event streaming parameter handling
Mock-based testing (no API calls)
⚠️ Missing Test Cases
No tests for error scenarios in event streaming - what happens when astream(events=True) raises an exception mid-stream?
No tests for the new finalization logic in step.py:9-11
No tests for cancellation during event streaming
No tests verifying that time.time() works (would catch the missing import)
Bugs & Critical Issues
🔴 CRITICAL: Missing Import
File: src/upsonic/agent/pipeline/steps.py
Issue: Missing import time
Impact: Runtime error on execution
Fix: Add import time to imports
🟡 MEDIUM: Potential Type Error
File: src/upsonic/agent/pipeline/steps.py
Issue: StepResult may not accept name and step_number parameters
Impact: Runtime error if these fields don't exist
Fix: Verify StepResult class definition
🟡 MEDIUM: Code Duplication
File: src/upsonic/chat/chat.py
Issue: _invoke_streaming_events() duplicates ~80 lines from _invoke_streaming()
Impact: Maintenance burden, potential for bugs when updating retry logic
Fix: Refactor to shared implementation
Recommendations
High Priority
✅ Add missing import time in steps.py
✅ Verify StepResult supports name and step_number fields
✅ Add tests for event streaming error scenarios
✅ Add tests for cancellation during event streaming
Medium Priority
Refactor duplicate retry logic in _invoke_streaming_events() and _invoke_streaming()
Add debug logging when events=True forces stream=True
Verify no double-finalization in step.py
Low Priority
Consider collecting events in non-streaming mode for debugging
Add performance benchmarks for event object creation overhead
Document the event streaming architecture in CLAUDE.md
Verdict
Overall Assessment: This is a well-implemented feature with excellent test coverage. However, there is a critical bug (missing time import) that must be fixed before merging.
Recommendation: Request changes for the critical import issue, then approve after fix.
Score: 7/10 (would be 9/10 after fixing the import and reducing code duplication)
Action Items for PR Author
Add import time to src/upsonic/agent/pipeline/steps.py
Verify StepResult class accepts name and step_number parameters
Run the new tests to ensure they pass
Consider refactoring duplicate retry logic
Add test for cancellation during event streaming
Great work on this feature! The event streaming capability will provide much better visibility into agent execution. 🚀
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.