Skip to content

feat(gateway): streaming final response for Telegram#697

Closed
clicksingh wants to merge 1 commit into
NousResearch:mainfrom
clicksingh:feat/gateway-telegram-streaming
Closed

feat(gateway): streaming final response for Telegram#697
clicksingh wants to merge 1 commit into
NousResearch:mainfrom
clicksingh:feat/gateway-telegram-streaming

Conversation

@clicksingh

@clicksingh clicksingh commented Mar 8, 2026

Copy link
Copy Markdown

What this does

Adds ChatGPT-style live typing preview for Telegram: instead of waiting for the full LLM response, the bot sends a placeholder message after ~20 tokens and edits it progressively as tokens arrive — same UX as the official ChatGPT Telegram bot.

How it works

run_agent.py

  • New stream_callback: callable = None param on AIAgent.__init__
  • New _run_streaming_call() method: calls the API with stream=True, fires stream_callback(delta) per text token, accumulates tool_call deltas, returns a fake response object compatible with the existing code path. Falls back to non-streaming on any error.
  • _interruptible_api_call._call() picks the streaming path when a callback is set: if codex → elif stream_callback → else normal

gateway/run.py

  • Before the agent is created: sets up a queue.Queue and _on_token callback (Telegram only)
  • Passes stream_callback=_on_token to the agent constructor
  • New stream_preview async task: reads tokens from the queue, sends the first message after 20 tokens, then edits it every 1.5s with accumulated text + cursor. On cancel (agent done), does a final edit removing the cursor.
  • stream_task cancelled and awaited in the finally block alongside other tasks

Result

Single message, types out word-by-word, cursor disappears when done. Tool progress messages still work in parallel. Falls back silently to normal delivery if streaming fails.

Tested on

  • Platform: WSL2 Ubuntu 24.04
  • Provider: ZAI (OpenAI-compatible, SSE streaming)
  • Telegram bot (group + DM)

Notes

  • Streaming is Telegram-only for now — trivial to extend to Discord/Slack by checking source.platform
  • The 1.5s edit interval respects Telegram's rate limit (~20 edits/min per message)
  • _run_streaming_call is provider-agnostic: works with any OpenAI-compatible endpoint that supports stream=True

Add ChatGPT-style live typing preview for Telegram via SSE streaming.

- Add stream_callback param to AIAgent.__init__ and _run_streaming_call()
  method that calls the LLM with stream=True, fires the callback per text
  token, accumulates tool_call deltas, and falls back to non-streaming on
  any error. The if/elif/else chain in _interruptible_api_call picks
  streaming when a callback is provided.

- Add stream_preview async task in the gateway that reads tokens from a
  thread-safe Queue, sends an initial placeholder message after 20 tokens,
  then edits it every 1.5s with accumulated text + cursor char. On cancel
  (agent done) does a final edit removing the cursor. Telegram only for now;
  other platforms can opt in by setting source.platform appropriately.

Result: messages appear word-by-word as the model generates, single message,
no flash/replace — same UX as ChatGPT on Telegram. Falls back silently to
normal delivery if streaming is unavailable or fails.
@teknium1

Copy link
Copy Markdown
Contributor

Thanks for the work @clicksingh — the streaming concept and _run_streaming_call() implementation are solid. However, there are two blockers:

  1. Stale branch — the diff against main deletes ~500 lines of existing features (Signal adapter, fallback model, /title, /resume, /rollback commands, background notification modes, auxiliary config, security config, and more). These are regressions from branching off an older main.

  2. Incomplete setup_stream_q, _stream_msg_id, and the _on_token callback referenced in stream_preview() are never initialized anywhere in the diff. The PR description mentions setting up a Queue and callback but that code isn't present. Would crash with NameError at runtime.

We're going to implement streaming properly as a separate effort, building on the approach you've outlined here. Your _run_streaming_call() method (provider-agnostic streaming with SimpleNamespace fake response) is a good design and we'll use a similar pattern. Thanks for pioneering this!

@teknium1 teknium1 closed this Mar 10, 2026
teknium1 added a commit that referenced this pull request Mar 11, 2026
Unified streaming architecture combining the best of PRs #774 and #798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for #774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
jobless0x pushed a commit to jobless0x/hermes-agent that referenced this pull request Mar 14, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
teknium1 added a commit that referenced this pull request Mar 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 28, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
CumulusService pushed a commit to Cumulus-Service-GmbH/hermes-agent that referenced this pull request May 30, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants