feat(gateway): streaming final response for Telegram#697
Conversation
Add ChatGPT-style live typing preview for Telegram via SSE streaming. - Add stream_callback param to AIAgent.__init__ and _run_streaming_call() method that calls the LLM with stream=True, fires the callback per text token, accumulates tool_call deltas, and falls back to non-streaming on any error. The if/elif/else chain in _interruptible_api_call picks streaming when a callback is provided. - Add stream_preview async task in the gateway that reads tokens from a thread-safe Queue, sends an initial placeholder message after 20 tokens, then edits it every 1.5s with accumulated text + cursor char. On cancel (agent done) does a final edit removing the cursor. Telegram only for now; other platforms can opt in by setting source.platform appropriately. Result: messages appear word-by-word as the model generates, single message, no flash/replace — same UX as ChatGPT on Telegram. Falls back silently to normal delivery if streaming is unavailable or fails.
|
Thanks for the work @clicksingh — the streaming concept and
We're going to implement streaming properly as a separate effort, building on the approach you've outlined here. Your |
Unified streaming architecture combining the best of PRs #774 and #798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for #774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
What this does
Adds ChatGPT-style live typing preview for Telegram: instead of waiting for the full LLM response, the bot sends a placeholder message after ~20 tokens and edits it progressively as tokens arrive — same UX as the official ChatGPT Telegram bot.
How it works
run_agent.pystream_callback: callable = Noneparam onAIAgent.__init___run_streaming_call()method: calls the API withstream=True, firesstream_callback(delta)per text token, accumulates tool_call deltas, returns a fake response object compatible with the existing code path. Falls back to non-streaming on any error._interruptible_api_call._call()picks the streaming path when a callback is set:if codex → elif stream_callback → else normalgateway/run.pyqueue.Queueand_on_tokencallback (Telegram only)stream_callback=_on_tokento the agent constructorstream_previewasync task: reads tokens from the queue, sends the first message after 20 tokens, then edits it every 1.5s with accumulated text +▌cursor. On cancel (agent done), does a final edit removing the cursor.stream_taskcancelled and awaited in thefinallyblock alongside other tasksResult
Single message, types out word-by-word, cursor disappears when done. Tool progress messages still work in parallel. Falls back silently to normal delivery if streaming fails.
Tested on
Notes
source.platform_run_streaming_callis provider-agnostic: works with any OpenAI-compatible endpoint that supportsstream=True