Skip to content

feat: telegram streaming with dual-transport (draft + edit fallback)#774

Closed
jobless0x wants to merge 5 commits into
NousResearch:mainfrom
jobless0x:feature/telegram-streaming
Closed

feat: telegram streaming with dual-transport (draft + edit fallback)#774
jobless0x wants to merge 5 commits into
NousResearch:mainfrom
jobless0x:feature/telegram-streaming

Conversation

@jobless0x

@jobless0x jobless0x commented Mar 9, 2026

Copy link
Copy Markdown

summary

add real-time response streaming for the telegram gateway with a dual-transport architecture. text chunks are streamed to the user as they arrive from the llm, using bot api 9.3+ sendmessagedraft as the primary transport with progressive message edits as the fallback.

transports

draft (primary, bot api 9.3+)

calls sendmessagedraft with a stable draft_id for each chunk. the telegram client animates the growing text natively without message edits, avoiding edit rate-limit pressure entirely. finalized with a regular sendmessage once the response is complete.

edit (fallback)

the original approach: sends an initial plain-text message, then progressively calls editmessagetext on a configurable interval.

auto (default)

tries draft first. on any failure (unsupported client, api error, finalize failure), falls back to edit for the remainder of the stream. the fallback is seamless -- the edit path fires immediately on the same iteration via a last_edit_time reset.

how it works

  • run_agent.py: threads an optional stream_callback(chunk, is_done) through run_conversation -> _interruptible_api_call -> streaming methods. supports both codex_responses and chat_completions api modes. callback is automatically suppressed during tool-call turns.

  • gateway/platforms/base.py: adds supports_streaming and supports_draft_streaming properties, send_raw()/edit_message_raw() for edit transport, send_draft()/finalize_draft() for draft transport, and delete_message().

  • gateway/platforms/telegram.py: implements both transports. draft uses send_message_draft (ptb 22.6+). finalize_draft sends the final formatted message with markdownv2, falling back to plain text with debug logging on parse failure.

  • gateway/run.py: async stream consumer with transport negotiation. reads streaming.transport config, negotiates draft availability via adapter property, maintains draft_failed flag for mid-stream fallback. queue-based sync-to-async bridge matches the existing tool progress pattern.

config

streaming:
  enabled: true
  edit_interval: 0.5
  buffer_threshold: 50
  cursor: ""
  transport: auto  # auto, draft, or edit

design decisions

  • draft-first with edit fallback: draft is smoother and avoids rate limits, but requires bot api 9.3+ support on the client. auto mode provides graceful degradation.
  • os.urandom for draft id: avoids collision risk from random.randint. generates a positive 31-bit integer from 4 random bytes.
  • immediate fallback on draft failure: when draft fails, last_edit_time is reset to 0 so the edit path fires on the same loop iteration. prevents message loss on the final chunk (critical fix for the case where draft fails on is_complete=true).
  • raw text for intermediate updates: both transports use raw text during streaming to avoid markdownv2 parse failures on incomplete text. only the final message gets full formatting.
  • debug-level lifecycle logs: consumer started/done logs are debug level to reduce noise in production. errors and fallbacks remain at warning/info.
  • tool call suppression: streaming callback is suppressed when the response contains tool calls, so only the final text answer is streamed.

thread an optional stream_callback(chunk, is_done) parameter
through run_conversation -> _interruptible_api_call -> stream methods.

for codex_responses: iterate stream events and forward
response.output_text.delta chunks via the callback. suppress
callback when tool calls are detected (intermediate turns).

for chat_completions: add _run_chat_completions_stream() that
accumulates chunks, forwards text deltas, and reconstructs a
compatible response object from streamed fragments.

the callback is a simple sync function that receives (str, bool)
where the bool signals the final chunk. callers can bridge to
async via a thread-safe queue.
base.py:
- add supports_streaming property (default false)
- add delete_message() for cleaning up progress indicators
- add send_raw() / edit_message_raw() for plain-text edits that
  skip markdownv2 formatting (avoids parse failures on incomplete
  streaming text)
- when _stream_msg_ids contains a message id for the chat, do a
  final formatted edit instead of sending a duplicate message

telegram.py:
- override supports_streaming = true
- implement delete_message() via telegram bot api
- implement send_raw() and edit_message_raw() with parse_mode=None
read streaming config from config.yaml (streaming.enabled,
edit_interval, buffer_threshold, cursor). when enabled, create
a queue-based bridge between the sync agent thread and the async
event loop.

send_stream_messages() drains all available chunks from the queue,
sends the first message as soon as 5+ characters arrive, then
progressively edits it on a configurable interval. uses send_raw /
edit_message_raw to avoid markdownv2 parse failures on incomplete
text. deletes the tool progress message when streaming begins.

after the agent finishes, the stream message id is propagated to
base.py via _stream_msg_ids so the final response is delivered as
a formatted edit rather than a duplicate send.

config.yaml example:
  streaming:
    enabled: true
    edit_interval: 0.5
    buffer_threshold: 50
    cursor: " ▉"
@BonifacioCalindoro

Copy link
Copy Markdown

Telegram just added to their BOT API streaming responses, precisely for these usecases. So the clean way would be to implement that specific feature of the telegram bot api, right? Also, handling edits of messages is not too optimal because of the 20-requests-per-minute-per-chat rate limits on telegram bot api.

@jobless0x

Copy link
Copy Markdown
Author

good call on sendMessageDraft, happy to add it as a primary transport with the current edit-based approach as fallback. on the rate limit point, the 20/min limit applies to groups only, private chats allow about 1/sec which the configurable edit_interval already respects.

@BonifacioCalindoro

Copy link
Copy Markdown

good call on sendMessageDraft, happy to add it as a primary transport with the current edit-based approach as fallback. on the rate limit point, the 20/min limit applies to groups only, private chats allow about 1/sec which the configurable edit_interval already respects.

Afaik, no, 20 per minute applies to any kind of chat, be it DMs, groups, channels, whatever. And also there's a global 30 req per second limit. Maybe they updated the rate limits recently, because I was building an agent a year ago and I hit the 20reqperminute on DMs by using the workaround you're using here. A quick test might clear this problem. Anyways, the native stream response for bot API will be the correct method for this

add supports_draft_streaming property, send_draft() and finalize_draft()
to base adapter with telegram implementation using bot api 9.3+
sendmessagedraft. includes debug logging on markdown fallback in
finalize_draft.
streaming consumer now reads transport config (auto/draft/edit) and
tries sendmessagedraft first when available. on any draft failure,
falls back to edit-based streaming for the remainder of the response.

fixes: reset last_edit_time on draft fallback so edit path fires
immediately, use os.urandom for draft id generation, downgrade
consumer lifecycle logs to debug, add debug logging to cancellation
and error paths.
@jobless0x jobless0x changed the title feat: telegram streaming via progressive message edits feat: telegram streaming with dual-transport (draft + edit fallback) Mar 9, 2026
@jobless0x

Copy link
Copy Markdown
Author

added dual-transport support, tries telegram bot api first, falls back to progressive edits if it fails.

@teknium1 teknium1 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review — Telegram Streaming with Dual Transport

Nice architecture here. The draft-first / edit-fallback approach with auto-negotiation is well thought out, the tool-call suppression logic is correct, and the sync-to-async bridge via queue.Queue matches our existing tool progress pattern. sendMessageDraft is confirmed available in PTB 22.6 (our installed version) with a matching signature.

That said, there are several issues that need to be addressed before this can be merged:


1. No tests (blocking)

661 additions across 4 core files (including run_agent.py) with zero tests. This is the main blocker. At minimum we need:

  • _run_chat_completions_stream: The response reconstruction via SimpleNamespace is the most fragile part — verify the reconstructed object shape works with all downstream code paths (.choices[0].message.tool_calls, .usage, etc.)
  • Stream consumer logic: Draft → edit fallback, tool-call suppression, buffer threshold / edit interval behavior
  • Transport negotiation: auto / draft / edit config modes, supports_draft_streaming false → skip draft
  • Edge cases: empty response, response that's only tool calls, cancellation during streaming

See tests/gateway/test_background_command.py for our pattern of mocking adapters in gateway tests.


2. Missing thread_id / message_thread_id in streaming methods (bug)

The existing send() method passes thread_id from metadata for forum topic support. But send_raw(), edit_message_raw(), send_draft(), and finalize_draft() don't accept or forward it. This means streaming will break in group chats that use forum topics — messages go to the wrong thread or fail.

send_message_draft in PTB 22.6 explicitly accepts message_thread_id. The stream consumer in run.py has access to source.thread_id — it just needs to be threaded through.


3. Config default discrepancies (docs vs code)

The PR description shows:

edit_interval: 0.5
buffer_threshold: 50

But the code defaults to:

_stream_edit_interval = float(_streaming_cfg.get("edit_interval", 1.5))
_stream_buffer_threshold = int(_streaming_cfg.get("buffer_threshold", 300))

Which values are intended? 1.5s is safer for rate limits but 0.5s gives a better UX. Either way, docs and code should match.


4. Config loading duplication (minor)

The streaming config is loaded by re-reading config.yaml directly (lines ~299-307 of the diff) instead of using the self.config dict that's already loaded on the GatewayRunner instance. This should use self.config.get("streaming", {}) instead — simpler, no duplicate I/O, and stays consistent with how other config sections are accessed.


5. Race between progress and stream message deletion (minor)

When streaming starts, the consumer deletes the progress message via progress_msg_id_holder[0]. Meanwhile the progress consumer may still be trying to edit that same message. The errors would be swallowed (Telegram returns "message not found" which is caught), but it's noisy. Consider setting a flag that tells the progress consumer to stop editing once streaming begins.


Summary

The core design is solid — dual transport, auto fallback, queue-based bridge, tool-call suppression all look right. The main blockers are: tests and the thread_id bug. The config discrepancies and duplication are easy fixes.

Happy to help write the tests if you want.

teknium1 added a commit that referenced this pull request Mar 11, 2026
Unified streaming architecture combining the best of PRs #774 and #798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for #774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
@teknium1

Copy link
Copy Markdown
Contributor

Heads up — we've opened a unified streaming draft PR (#922) that combines the best aspects of this PR with #798 (OutThisLife's CLI streaming). Your dual-transport architecture (draft + edit fallback), Telegram-specific methods, and config section are all incorporated.

Key fix: message_thread_id is now threaded through all streaming methods (send_raw, send_draft, finalize_draft) so forum topics work correctly.

The draft PR is held pending end-to-end testing with real streaming providers. Thanks for the excellent work on this — the draft transport approach in particular is exactly right.

jobless0x pushed a commit to jobless0x/hermes-agent that referenced this pull request Mar 14, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
teknium1 added a commit that referenced this pull request Mar 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs #922 (unified streaming), #1312 (gateway consumer),
#774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
teknium1 added a commit that referenced this pull request Mar 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
@teknium1

Copy link
Copy Markdown
Contributor

Merged via PR #1538 — unified streaming infrastructure. Your streaming concepts and code patterns were incorporated with authorship credit in the commit messages. Thank you for your contribution!

@teknium1 teknium1 closed this Mar 16, 2026
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 28, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
CumulusService pushed a commit to Cumulus-Service-GmbH/hermes-agent that referenced this pull request May 30, 2026
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants