Streaming TUI, streaming CLI output with line-buffered rendering#798
Streaming TUI, streaming CLI output with line-buffered rendering#798OutThisLife wants to merge 5 commits into
Conversation
Unified streaming architecture combining the best of PRs #774 and #798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for #774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
|
Heads up — we've opened a unified streaming draft PR (#922) that combines the best aspects of this PR with #774 (jobless0x's Telegram dual-transport). Your tests, CLI streaming, Added on top: Telegram The draft PR is held pending end-to-end testing with real streaming providers. Thanks for the solid foundation — your tests especially made this much easier to build. |
|
Thanks for the streaming implementation! We've incorporated your approach into a unified streaming branch (feat/streaming) that combines the best of this PR, #774, and #697 into a single cohesive architecture. Your core contributions that carried over:
The unified version adds config-driven streaming, Telegram dual-transport (draft + edit), and forum topic support on top of your foundation. Appreciate the contribution! |
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs #922 (unified streaming), #1312 (gateway consumer), #774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…ponse box framing Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (#798 CLI streaming concept).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…ponse box framing Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (NousResearch#798 CLI streaming concept).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…ponse box framing Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (NousResearch#798 CLI streaming concept).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…ponse box framing Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (NousResearch#798 CLI streaming concept).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…ponse box framing Stage 2 of streaming support. CLI now streams tokens in real-time: - _stream_delta(): line-buffered rendering via _cprint (prompt_toolkit safe) - _flush_stream(): emits remaining buffer and closes response box - Response box opens on first token, closes on flush - Skip Rich Panel when streaming already displayed content - Reset streaming state before each agent turn - Compatible with existing TTS streaming (both can fire simultaneously) - Uses skin engine for response label branding Credit: OutThisLife (NousResearch#798 CLI streaming concept).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
What does this PR do?
Adds real-time streaming output to the CLI and gateway. LLM responses now render progressively as tokens arrive instead of blocking until completion. Uses line-buffered rendering through prompt_toolkit's native
_cprintpath -- the only approach that works reliably without garbling partial lines or fightingpatch_stdout'sStdoutProxy.Related Issue
Type of Change
Changes Made
run_agent.py: Added_interruptible_streaming_api_call()-- streaming variant of the API call that firesstream_delta_callback(text)per token, accumulates into the same response shape downstream expects, falls back to non-streaming on provider errorscli.py: Line-buffered_stream_delta/_flush_streamthat emits complete lines via_cprint, strips leading blank lines, idle placeholder text, response box framing wraps streamed contentgateway/run.py: Queue-based stream dispatcher with batchededit_message()for messaging platformsgateway/platforms/base.py:already_sentflag so gateway skips re-sending streamed contenttests/test_streaming.py: Accumulator shape, callback order, fallback behaviorHow to Test
hermes-- ask it something, verify output streams line-by-line inside the response boxpython -m pytest tests/test_streaming.py -q