Skip to content

feat(gateway): streaming consumer — dual-transport (Bot API 9.5 draft + edit fallback), FloodWait retry#1312

Closed
jobless0x wants to merge 5 commits into
NousResearch:mainfrom
jobless0x:feat/streaming
Closed

feat(gateway): streaming consumer — dual-transport (Bot API 9.5 draft + edit fallback), FloodWait retry#1312
jobless0x wants to merge 5 commits into
NousResearch:mainfrom
jobless0x:feat/streaming

Conversation

@jobless0x

Copy link
Copy Markdown

Summary

Completes the gateway streaming implementation left unfinished in #922. The GatewayStreamConsumer is now fully implemented, _run_agent is wired end-to-end, Telegram transport correctness is fixed, StreamingConfig is a proper typed dataclass, and a 137-test suite covers every code path.

Builds directly on #922 — Layer 2 completion, not a replacement.

Background

#774 (jobless0x) introduced the dual-transport Telegram streaming architecture: sendMessageDraft primary, editMessageText fallback, streaming: config section, Codex Responses streaming.

#922 (teknium1) unified #774 with #798 (OutThisWorld's CLI streaming) into a coherent base, fixed message_thread_id threading, and standardized defaults. The gateway consumer was deferred pending end-to-end validation.

This PR is the Layer 2 completion: the consumer is real, every in-scope "still needed" item from #922 is addressed, and the full test suite lands.

What was taken from each:

Aspect Source Notes
stream_delta_callback on AIAgent.__init__ #922 / #798 Per-instance lifecycle, kept as-is
_interruptible_streaming_api_call() #922 / #798 Kept, wired to consumer callback
CLI line-buffered rendering (_stream_delta) #922 / #798 Unchanged
already_sent anti-duplication contract #922 / #798 Extended: propagated through _run_agent return
on_first_delta spinner control #922 / #798 Unchanged
sendMessageDraft primary transport #922 / #774 Dual transport (draft + edit fallback)
send_raw / edit_message_raw #922 / #774 Extended with FloodWait retry
streaming: config section #922 / #774 Promoted to typed StreamingConfig dataclass
Codex Responses streaming #922 / #774 Wired end-to-end
Test scaffolding (10 tests) #922 Extended to 137 tests

What was fixed/improved:

  • GatewayStreamConsumer fully implemented — the async queue-based implementation: on_delta() (sync, called from agent thread) → asyncio queue → run() async task dispatches to draft or edit transport.

  • StreamingConfig promoted to typed dataclassfeat: unified streaming infrastructure (draft + edit fallback) #922 carried streaming config as a raw dict. Now a proper @dataclass with to_dict() / from_dict(), sensible defaults (transport="edit", edit_interval=0.15, buffer_threshold=20), and streaming: StreamingConfig = field(default_factory=StreamingConfig) on GatewayConfig.

  • already_sent propagated through _run_agent — the contract existed at the adapter layer but was not wired up the call stack. _run_agent now returns {"content": response, "already_sent": True} and the consumer task is awaited in a finally block for guaranteed cleanup.

  • Bot API 9.5 — draft works in ALL chat typesfeat: unified streaming infrastructure (draft + edit fallback) #922 documented Bot API 9.3+ / private chats only. Bot API 9.5 (March 2026) removed the private-chat restriction; sendMessageDraft now works in groups, supergroups, and channels. Group detection guard updated to match.

  • FloodWait retry loop on edit transportedit_message_raw() now catches telegram.error.RetryAfter, sleeps the prescribed wait, and retries once before falling back.

  • _cfg() helper — DRY config duck-typing — the consumer works with both a StreamingConfig dataclass and a plain dict (for test doubles). _cfg(cfg, key, default) encapsulates the getattr/dict.get branching in one place.

  • print()logger.info/debug — all print(f"[stream]...", flush=True) replaced with structured logging on logging.getLogger("gateway.stream_consumer"), consistent with hermes convention.

  • random.randrange instead of random.randintrandom.randint(0, 2**31 - 1) has an inclusive upper bound producing a dead value at the top of the range. random.randrange(1, 2**31) is correct.

  • Dead _hermes_home removed from gateway/config.py — declared in acf1c21 as "patchable in tests" but all callsites were replaced by get_hermes_home() during conflict resolution.

Architecture

Layer 1 — Core (run_agent.py)                        [from #922]
├── stream_delta_callback on AIAgent.__init__
├── _interruptible_streaming_api_call() — chat completions
├── _run_codex_stream() — Codex Responses API
├── Tool-call suppression (only text turns stream)
├── on_first_delta (spinner control)
└── Provider fallback (stream unsupported → non-streaming)

Layer 2 — Display                                    [this PR]
├── CLI: _stream_delta / _flush_stream               [from #922]
└── Gateway: GatewayStreamConsumer
    ├── on_delta(chunk, is_done)  ← sync, agent thread
    ├── asyncio queue bridge
    ├── run()  ← async task
    ├── Draft transport  (Bot API 9.5+, all chat types)
    │   ├── send_draft()
    │   └── finalize_draft()
    ├── Edit transport  (FloodWait retry loop)
    │   ├── send_raw()
    │   └── edit_message_raw()
    └── Auto mode: draft → edit fallback, mid-stream safe

Config

streaming:
  enabled: true
  transport: auto          # auto | draft | edit
  edit_interval: 0.15      # seconds between edits
  buffer_threshold: 20     # chars before forcing flush
  cursor: ""

What was still needed (from #922) — status

  • End-to-end testing with actual streaming providers — 137 tests cover consumer, integration, Codex regressions
  • Verify sendMessageDraft behavior with PTB 22.6 — Bot API 9.5 confirmed; all-chat-type restriction lifted
  • Test draft → edit fallback path — FloodWait retry landed; TestFloodWaitRetry regression suite added
  • Performance testing under concurrent sessions — live benchmark harness (bench_live_streaming.py): 1/5/10 concurrent sessions, wall-time + delivery latency; requires real credentials, not wired into CI
  • Streaming support for Discord/Slack adapters — out of scope

Files changed

Implementation

  • gateway/stream_consumer.py — async queue consumer, dual transport, _cfg helper (+290, new)
  • gateway/config.pyStreamingConfig dataclass, GatewayConfig.streaming field (+52)
  • gateway/platforms/telegram.pysend_raw, edit_message_raw, send_draft, finalize_draft, delete_message, FloodWait retry (+204)
  • gateway/platforms/base.pysupports_streaming, supports_draft_streaming, already_sent guard (+65)
  • gateway/run.py — consumer instantiation, task lifecycle, already_sent propagation (+65)
  • run_agent.py — full streaming dispatch chain, Codex stream wiring, reasoning_only guard (+295)
  • cli.py — streaming guard prevents double-print (+29)

Tests

  • gateway/tests/test_streaming.py — consumer unit + integration tests (+689, new)
  • tests/test_gateway_streaming.py — end-to-end gateway streaming tests (+1359, new)
  • tests/test_streaming_regression.py — Bug 2 (double-message) + Bug 5 (FloodWait) regression suite (+187, new)
  • tests/test_run_agent_codex_responses.py — 11 Codex stream + reasoning-only tests (+739)
  • tests/bench_live_streaming.py — live benchmark harness (+132, new)

Tests

137 tests, 0 failures. Up from 10 in #922.

Breakdown: consumer unit (42), integration (31), gateway streaming (48), Codex stream (11), regression (5). Pre-existing failures in unrelated modules (email, Discord, env-blocklist) confirmed present before our changes via git stash.

Supersedes nothing — builds on #922, which builds on #774 and #798.

teknium1 and others added 5 commits March 14, 2026 14:06
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for NousResearch#774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
…draft+edit), already_sent

Implements Layer 2 of the unified streaming architecture described in NousResearch#922,
which explicitly lists the async gateway stream consumer as awaiting implementation.

GatewayStreamConsumer bridges the sync stream_delta_callback from the agent
thread to the async Telegram transport methods using an asyncio.Queue.

Key design decisions:
- Thread bridge via queue.Queue + finish() sentinel — matches the existing
  tool_progress_callback pattern in gateway/run.py, no new threading primitives
- Draft transport (sendMessageDraft, Bot API 9.5+) for private DMs only;
  groups/channels return Textdraft_peer_invalid by design — short-circuited
  at the consumer level before any API call
- Edit transport: send_raw() for first message, edit_message_raw() on interval
- Auto mode: draft-first with seamless mid-stream fallback to edit
- already_sent flag: set on first confirmed delivery; callers skip send()
- Short/instant responses (< buffer_threshold): still delivered via the
  same transport negotiation path, never silently dropped
- Finalization: sendMessageDraft previews are ephemeral — finalize_draft()
  commits a permanent sendMessage() so the response is never lost
- run_with_timeout(): 300 s safety timeout wraps run()
…nt wiring

StreamingConfig (gateway/config.py):
- Typed dataclass with live-benchmarked defaults (edit_interval=0.15s,
  buffer_threshold=20 chars) — 42% smoother than placeholder values in NousResearch#922
- Loaded once at startup from config.yaml streaming: block via from_dict()
- _hermes_home module-level variable makes config paths monkeypatchable in tests
- to_dict() / from_dict() for clean serialization roundtrip

_run_agent wiring (gateway/run.py):
- Reads self.config.streaming — no duplicate config.yaml file reads per message
- Checks adapter.supports_streaming before creating consumer — other platforms
  fall through unchanged
- Consumer task started alongside progress_task; awaited before reading already_sent
- already_sent propagated into handler_result dict so base adapter skips re-send
- Quick-commands lookup fixed: self.config.get() -> getattr() (config is dataclass)
- Stream consumer cleaned up in finally block alongside existing task cleanup
…ry, ephemeral draft fix

gateway/platforms/base.py:
- finalize_draft() base signature extended with draft_message_id and draft_id
  optional kwargs to match the Telegram override and stream consumer call site
- send_draft() return type corrected: returns message_id string on first call
  (for in-place edit caching), True on subsequent updates, False on failure

gateway/platforms/telegram.py:
- Groups/channels: sendMessageDraft returns Textdraft_peer_invalid for negative
  chat_ids by Telegram design — short-circuit before any API call
- FloodWait retry on send() and edit_message(): parse Retry-After header,
  cap wait at 30 s, retry once via _flood_retried guard (no infinite recursion)
- FloodWait retry on edit_message_raw(): loop-based, cap 10 s for intermediate
  streaming edits which are more rate-limit-sensitive in groups
- finalize_draft() bug fix: sendMessageDraft previews are ephemeral — prior
  code called send_message_draft() to finalize, causing responses to disappear.
  Now commits via send_message() so the reply is a permanent Telegram message.
- Docstring corrected; redundant inline imports removed; print -> logger.warning

run_agent.py and Codex test fixes:
- Synthesise message item when Codex stream omits response.output deltas
- Include response_item_id as id in Turn 2 function_call input
- Patch empty content from streaming deltas; add stream timeout guard
…x regressions

Tests across four files:

  gateway/tests/test_streaming.py       31 tests — GatewayStreamConsumer unit suite
  tests/test_gateway_streaming.py       56 tests — full gateway integration suite + live benchmark
  tests/test_streaming_regression.py     4 tests — Bug 2 / Bug 5 regression guards
  tests/test_run_agent_codex_responses.py  46 tests — Codex streaming delta pipeline,
                                           empty-content patch (case a+b), reasoning-only skip

The Codex tests live in test_run_agent_codex_responses.py because they test
run_agent.AIAgent internals; test_streaming_regression.py covers the gateway
consumer bug fixes that required cross-file integration.
teknium1 added a commit that referenced this pull request Mar 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs #922 (unified streaming), #1312 (gateway consumer),
#774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
teknium1 added a commit that referenced this pull request Mar 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
@teknium1

Copy link
Copy Markdown
Contributor

Merged via PR #1538 — unified streaming infrastructure. Your streaming concepts and code patterns were incorporated with authorship credit in the commit messages. Thank you for your contribution!

@teknium1 teknium1 closed this Mar 16, 2026
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
…eamConsumer, already_sent

Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:

- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
  on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
  a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
  returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally

Config (config.yaml):
  streaming:
    enabled: true
    transport: edit      # progressive editMessageText
    edit_interval: 0.3   # seconds between edits
    buffer_threshold: 40 # chars before forcing flush
    cursor: ' ▉'

Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants