Skip to content

Add event streaming chat#516

Merged
DoganK01 merged 2 commits intomasterfrom
add-event-streaming-chat
Jan 31, 2026
Merged

Add event streaming chat#516
DoganK01 merged 2 commits intomasterfrom
add-event-streaming-chat

Conversation

@DoganK01
Copy link
Copy Markdown
Member

  • event streaming for chat class

@claude
Copy link
Copy Markdown

claude bot commented Jan 31, 2026

PR Review: Add Event Streaming Chat

Summary

This PR adds event streaming support to the Chat class, allowing users to receive structured AgentEvent objects instead of just text chunks when streaming. This is a valuable feature that provides better visibility into agent execution.

Code Quality & Best Practices

✅ Strengths

  1. Good Type Safety: Excellent use of @overload decorators to provide clear type hints for different parameter combinations
  2. Comprehensive Testing: Added both smoke tests and unit tests with good coverage of edge cases
  3. Backward Compatibility: The changes maintain backward compatibility - existing code using stream() or invoke() will continue to work
  4. Consistent Error Handling: Added proper try-catch-finally blocks with cancellation support in pipeline steps
  5. Code Reuse: Good abstraction with _invoke_streaming_events() method to avoid code duplication

⚠️ Issues & Recommendations

1. Missing time Import in steps.py

Location: src/upsonic/agent/pipeline/steps.py:54

The code uses time.time() but doesn't import time module:

start_time = time.time()  # line 54, 236
execution_time=time.time() - start_time  # multiple locations

Fix: Add import time at the top of the file.

Impact: 🔴 Critical - This will cause runtime errors.


2. Inconsistent StepResult Construction

Location: src/upsonic/agent/pipeline/steps.py:65-72, 115-122

The updated execute() methods now include name and step_number fields in StepResult, but the original code didn't have these:

step_result = StepResult(
    name=self.name,
    step_number=step_number,
    status=StepStatus.COMPLETED,
    message="Skipped due to cache hit",
    execution_time=time.time() - start_time
)

Concern: Need to verify that StepResult dataclass/class accepts these fields. If not, this will cause TypeErrors.

Recommendation: Review the StepResult class definition to ensure these fields are supported.


3. Duplicate Error Handling Logic

Location: src/upsonic/chat/chat.py:478-527

The _invoke_streaming_events() method duplicates almost all the error handling logic from _invoke_streaming(). The only difference is calling agent.astream() with events=True vs events=False.

Recommendation: Consider refactoring to reduce duplication:

def _invoke_streaming_base(
    self,
    task: Task,
    response_start_time: float,
    events: bool = False,
    **kwargs: Any
) -> AsyncIterator[Any]:
    # Common retry and error handling logic
    # Just pass events parameter to agent.astream()

This would eliminate ~80 lines of duplicate code.


4. Forced Streaming Behavior

Location: src/upsonic/chat/chat.py:430-432

# If events=True, force stream=True
if events and not stream:
    stream = True

Issue: This silently overrides the user's explicit stream=False parameter without warning. While the behavior makes sense (events require streaming), it could be confusing.

Recommendation: Consider one of these approaches:

  • Document this behavior clearly in docstrings ✅ (already done)
  • Log a debug message when forcing stream=True
  • Alternatively, raise a ValueError if events=True and stream=False to make the requirement explicit

5. Event Streaming in Non-Streaming Context

Location: src/upsonic/agent/pipeline/steps.py:106-113, 143-150

The code emits memory update events even when context.is_streaming is False, but only appends to context.events when streaming:

if context.is_streaming:
    from upsonic.utils.agent.events import ayield_memory_update_event
    async for event in ayield_memory_update_event(...):
        context.events.append(event)

Question: Should events be collected even in non-streaming mode for later retrieval? Current implementation loses these events in non-streaming executions.


6. Missing Finalization in run_stream()

Location: src/upsonic/agent/pipeline/step.py:9-11

Added finalization logic in run_stream():

# Finalize step result (updates context.step_results and execution_stats)
if result:
    self._finalize_step_result(result, context)

Concern: This was previously missing - good catch! However, need to verify this doesn't cause double-finalization if execute_stream() already calls it.

Recommendation: Review execute_stream() and _finalize_step_result() implementations to ensure this doesn't cause issues.


Performance Considerations

✅ Good Practices

  1. Async Generators: Proper use of async generators for streaming without blocking
  2. No Unnecessary Buffering: Events are yielded immediately without buffering
  3. Retry Logic: Exponential backoff with proper cleanup prevents resource leaks

⚠️ Potential Issues

  1. Event Object Creation: Creating event objects for every text delta could have overhead for large responses. Consider measuring performance impact.
  2. Generator Cleanup: Good cleanup in finally blocks, but verify aclose() is always called properly on early termination.

Security Concerns

✅ No Critical Issues Found

  1. No SQL Injection: No database queries in this PR
  2. No Command Injection: No shell execution
  3. Input Validation: Proper validation of events parameter (boolean)

ℹ️ Notes

  1. Error Message Truncation: Error messages are truncated to 500 chars in StepResult (line 175, 215, 315, 326) - this is good for preventing information leakage
  2. Run Cancellation: Proper integration with run cancellation system

Test Coverage

✅ Excellent Test Coverage

  1. Smoke Tests (test_chat_event_streaming.py): 6 comprehensive integration tests

    • Basic event streaming
    • Invoke with events
    • Text-only responses (no tools)
    • Tool result validation
    • Multiple messages in session
    • Context handling
  2. Unit Tests (test_chat.py): 564 lines of thorough unit tests

    • Initialization validation
    • Input normalization
    • Retry logic
    • Session state management
    • Event streaming parameter handling
    • Mock-based testing (no API calls)

⚠️ Missing Test Cases

  1. No tests for error scenarios in event streaming - what happens when astream(events=True) raises an exception mid-stream?
  2. No tests for the new finalization logic in step.py:9-11
  3. No tests for cancellation during event streaming
  4. No tests verifying that time.time() works (would catch the missing import)

Bugs & Critical Issues

🔴 CRITICAL: Missing Import

  • File: src/upsonic/agent/pipeline/steps.py
  • Issue: Missing import time
  • Impact: Runtime error on execution
  • Fix: Add import time to imports

🟡 MEDIUM: Potential Type Error

  • File: src/upsonic/agent/pipeline/steps.py
  • Issue: StepResult may not accept name and step_number parameters
  • Impact: Runtime error if these fields don't exist
  • Fix: Verify StepResult class definition

🟡 MEDIUM: Code Duplication

  • File: src/upsonic/chat/chat.py
  • Issue: _invoke_streaming_events() duplicates ~80 lines from _invoke_streaming()
  • Impact: Maintenance burden, potential for bugs when updating retry logic
  • Fix: Refactor to shared implementation

Recommendations

High Priority

  1. ✅ Add missing import time in steps.py
  2. ✅ Verify StepResult supports name and step_number fields
  3. ✅ Add tests for event streaming error scenarios
  4. ✅ Add tests for cancellation during event streaming

Medium Priority

  1. Refactor duplicate retry logic in _invoke_streaming_events() and _invoke_streaming()
  2. Add debug logging when events=True forces stream=True
  3. Verify no double-finalization in step.py

Low Priority

  1. Consider collecting events in non-streaming mode for debugging
  2. Add performance benchmarks for event object creation overhead
  3. Document the event streaming architecture in CLAUDE.md

Verdict

Overall Assessment: This is a well-implemented feature with excellent test coverage. However, there is a critical bug (missing time import) that must be fixed before merging.

Recommendation: Request changes for the critical import issue, then approve after fix.

Score: 7/10 (would be 9/10 after fixing the import and reducing code duplication)


Action Items for PR Author

  • Add import time to src/upsonic/agent/pipeline/steps.py
  • Verify StepResult class accepts name and step_number parameters
  • Run the new tests to ensure they pass
  • Consider refactoring duplicate retry logic
  • Add test for cancellation during event streaming

Great work on this feature! The event streaming capability will provide much better visibility into agent execution. 🚀

@DoganK01 DoganK01 merged commit 3813e39 into master Jan 31, 2026
7 checks passed
@DoganK01 DoganK01 deleted the add-event-streaming-chat branch January 31, 2026 16:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant