feat: telegram streaming with dual-transport (draft + edit fallback)#774
feat: telegram streaming with dual-transport (draft + edit fallback)#774jobless0x wants to merge 5 commits into
Conversation
thread an optional stream_callback(chunk, is_done) parameter through run_conversation -> _interruptible_api_call -> stream methods. for codex_responses: iterate stream events and forward response.output_text.delta chunks via the callback. suppress callback when tool calls are detected (intermediate turns). for chat_completions: add _run_chat_completions_stream() that accumulates chunks, forwards text deltas, and reconstructs a compatible response object from streamed fragments. the callback is a simple sync function that receives (str, bool) where the bool signals the final chunk. callers can bridge to async via a thread-safe queue.
base.py: - add supports_streaming property (default false) - add delete_message() for cleaning up progress indicators - add send_raw() / edit_message_raw() for plain-text edits that skip markdownv2 formatting (avoids parse failures on incomplete streaming text) - when _stream_msg_ids contains a message id for the chat, do a final formatted edit instead of sending a duplicate message telegram.py: - override supports_streaming = true - implement delete_message() via telegram bot api - implement send_raw() and edit_message_raw() with parse_mode=None
read streaming config from config.yaml (streaming.enabled,
edit_interval, buffer_threshold, cursor). when enabled, create
a queue-based bridge between the sync agent thread and the async
event loop.
send_stream_messages() drains all available chunks from the queue,
sends the first message as soon as 5+ characters arrive, then
progressively edits it on a configurable interval. uses send_raw /
edit_message_raw to avoid markdownv2 parse failures on incomplete
text. deletes the tool progress message when streaming begins.
after the agent finishes, the stream message id is propagated to
base.py via _stream_msg_ids so the final response is delivered as
a formatted edit rather than a duplicate send.
config.yaml example:
streaming:
enabled: true
edit_interval: 0.5
buffer_threshold: 50
cursor: " ▉"
|
Telegram just added to their BOT API streaming responses, precisely for these usecases. So the clean way would be to implement that specific feature of the telegram bot api, right? Also, handling edits of messages is not too optimal because of the 20-requests-per-minute-per-chat rate limits on telegram bot api. |
|
good call on sendMessageDraft, happy to add it as a primary transport with the current edit-based approach as fallback. on the rate limit point, the 20/min limit applies to groups only, private chats allow about 1/sec which the configurable edit_interval already respects. |
Afaik, no, 20 per minute applies to any kind of chat, be it DMs, groups, channels, whatever. And also there's a global 30 req per second limit. Maybe they updated the rate limits recently, because I was building an agent a year ago and I hit the 20reqperminute on DMs by using the workaround you're using here. A quick test might clear this problem. Anyways, the native stream response for bot API will be the correct method for this |
add supports_draft_streaming property, send_draft() and finalize_draft() to base adapter with telegram implementation using bot api 9.3+ sendmessagedraft. includes debug logging on markdown fallback in finalize_draft.
streaming consumer now reads transport config (auto/draft/edit) and tries sendmessagedraft first when available. on any draft failure, falls back to edit-based streaming for the remainder of the response. fixes: reset last_edit_time on draft fallback so edit path fires immediately, use os.urandom for draft id generation, downgrade consumer lifecycle logs to debug, add debug logging to cancellation and error paths.
|
added dual-transport support, tries telegram bot api first, falls back to progressive edits if it fails. |
teknium1
left a comment
There was a problem hiding this comment.
Review — Telegram Streaming with Dual Transport
Nice architecture here. The draft-first / edit-fallback approach with auto-negotiation is well thought out, the tool-call suppression logic is correct, and the sync-to-async bridge via queue.Queue matches our existing tool progress pattern. sendMessageDraft is confirmed available in PTB 22.6 (our installed version) with a matching signature.
That said, there are several issues that need to be addressed before this can be merged:
1. No tests (blocking)
661 additions across 4 core files (including run_agent.py) with zero tests. This is the main blocker. At minimum we need:
_run_chat_completions_stream: The response reconstruction viaSimpleNamespaceis the most fragile part — verify the reconstructed object shape works with all downstream code paths (.choices[0].message.tool_calls,.usage, etc.)- Stream consumer logic: Draft → edit fallback, tool-call suppression, buffer threshold / edit interval behavior
- Transport negotiation:
auto/draft/editconfig modes,supports_draft_streamingfalse → skip draft - Edge cases: empty response, response that's only tool calls, cancellation during streaming
See tests/gateway/test_background_command.py for our pattern of mocking adapters in gateway tests.
2. Missing thread_id / message_thread_id in streaming methods (bug)
The existing send() method passes thread_id from metadata for forum topic support. But send_raw(), edit_message_raw(), send_draft(), and finalize_draft() don't accept or forward it. This means streaming will break in group chats that use forum topics — messages go to the wrong thread or fail.
send_message_draft in PTB 22.6 explicitly accepts message_thread_id. The stream consumer in run.py has access to source.thread_id — it just needs to be threaded through.
3. Config default discrepancies (docs vs code)
The PR description shows:
edit_interval: 0.5
buffer_threshold: 50But the code defaults to:
_stream_edit_interval = float(_streaming_cfg.get("edit_interval", 1.5))
_stream_buffer_threshold = int(_streaming_cfg.get("buffer_threshold", 300))Which values are intended? 1.5s is safer for rate limits but 0.5s gives a better UX. Either way, docs and code should match.
4. Config loading duplication (minor)
The streaming config is loaded by re-reading config.yaml directly (lines ~299-307 of the diff) instead of using the self.config dict that's already loaded on the GatewayRunner instance. This should use self.config.get("streaming", {}) instead — simpler, no duplicate I/O, and stays consistent with how other config sections are accessed.
5. Race between progress and stream message deletion (minor)
When streaming starts, the consumer deletes the progress message via progress_msg_id_holder[0]. Meanwhile the progress consumer may still be trying to edit that same message. The errors would be swallowed (Telegram returns "message not found" which is caught), but it's noisy. Consider setting a flag that tells the progress consumer to stop editing once streaming begins.
Summary
The core design is solid — dual transport, auto fallback, queue-based bridge, tool-call suppression all look right. The main blockers are: tests and the thread_id bug. The config discrepancies and duplication are easy fixes.
Happy to help write the tests if you want.
Unified streaming architecture combining the best of PRs #774 and #798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for #774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
|
Heads up — we've opened a unified streaming draft PR (#922) that combines the best aspects of this PR with #798 (OutThisLife's CLI streaming). Your dual-transport architecture (draft + edit fallback), Telegram-specific methods, and config section are all incorporated. Key fix: The draft PR is held pending end-to-end testing with real streaming providers. Thanks for the excellent work on this — the draft transport approach in particular is exactly right. |
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs #922 (unified streaming), #1312 (gateway consumer), #774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
|
Merged via PR #1538 — unified streaming infrastructure. Your streaming concepts and code patterns were incorporated with authorship credit in the commit messages. Thank you for your contribution! |
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
summary
add real-time response streaming for the telegram gateway with a dual-transport architecture. text chunks are streamed to the user as they arrive from the llm, using bot api 9.3+ sendmessagedraft as the primary transport with progressive message edits as the fallback.
transports
draft (primary, bot api 9.3+)
calls sendmessagedraft with a stable draft_id for each chunk. the telegram client animates the growing text natively without message edits, avoiding edit rate-limit pressure entirely. finalized with a regular sendmessage once the response is complete.
edit (fallback)
the original approach: sends an initial plain-text message, then progressively calls editmessagetext on a configurable interval.
auto (default)
tries draft first. on any failure (unsupported client, api error, finalize failure), falls back to edit for the remainder of the stream. the fallback is seamless -- the edit path fires immediately on the same iteration via a last_edit_time reset.
how it works
run_agent.py: threads an optional stream_callback(chunk, is_done) through run_conversation -> _interruptible_api_call -> streaming methods. supports both codex_responses and chat_completions api modes. callback is automatically suppressed during tool-call turns.
gateway/platforms/base.py: adds supports_streaming and supports_draft_streaming properties, send_raw()/edit_message_raw() for edit transport, send_draft()/finalize_draft() for draft transport, and delete_message().
gateway/platforms/telegram.py: implements both transports. draft uses send_message_draft (ptb 22.6+). finalize_draft sends the final formatted message with markdownv2, falling back to plain text with debug logging on parse failure.
gateway/run.py: async stream consumer with transport negotiation. reads streaming.transport config, negotiates draft availability via adapter property, maintains draft_failed flag for mid-stream fallback. queue-based sync-to-async bridge matches the existing tool progress pattern.
config
design decisions