feat(gateway): streaming consumer — dual-transport (Bot API 9.5 draft + edit fallback), FloodWait retry#1312
Closed
jobless0x wants to merge 5 commits into
Closed
feat(gateway): streaming consumer — dual-transport (Bot API 9.5 draft + edit fallback), FloodWait retry#1312jobless0x wants to merge 5 commits into
jobless0x wants to merge 5 commits into
Conversation
Unified streaming architecture combining the best of PRs NousResearch#774 and NousResearch#798, with improvements. This is a draft — awaiting proper streaming token implementation and testing before merge. Layer 1 — Core streaming (run_agent.py): - stream_delta_callback on AIAgent.__init__ (per-instance) - _interruptible_streaming_api_call() for chat completions with SimpleNamespace response reconstruction - Tool-call suppression (callback only fires for text-only responses) - on_first_delta callback (stops thinking spinner on first token) - Provider fallback when streaming unsupported - reasoning_content accumulation - Interrupt support (client.close() + rebuild) Layer 2 — Display (cli.py, gateway/): - CLI: line-buffered _stream_delta/_flush_stream via _cprint - Gateway: async stream consumer with dual transport: * Draft (Bot API 9.3+ sendMessageDraft) as primary * Progressive editMessageText as fallback * Auto mode tries draft, falls back seamlessly - Config-driven: streaming.enabled, edit_interval, buffer_threshold, cursor, transport (auto/draft/edit) - Uses self.config (no duplicate yaml reads) - already_sent flag prevents duplicate sends in base.py Telegram-specific (gateway/platforms/telegram.py): - send_raw / edit_message_raw (plain text, no MarkdownV2) - send_draft / finalize_draft (Bot API 9.3+) - delete_message - All methods pass message_thread_id for forum topic support (fix for NousResearch#774's missing thread_id bug) Tests: 10 new tests covering accumulator shape, callback order, tool-call suppression, provider fallback, already_sent contract. Config example: streaming: enabled: true edit_interval: 1.0 buffer_threshold: 100 cursor: ' ▉' transport: auto # auto, draft, or edit Supersedes: NousResearch#774 (jobless0x), NousResearch#798 (OutThisLife), NousResearch#697 (clicksingh)
…draft+edit), already_sent Implements Layer 2 of the unified streaming architecture described in NousResearch#922, which explicitly lists the async gateway stream consumer as awaiting implementation. GatewayStreamConsumer bridges the sync stream_delta_callback from the agent thread to the async Telegram transport methods using an asyncio.Queue. Key design decisions: - Thread bridge via queue.Queue + finish() sentinel — matches the existing tool_progress_callback pattern in gateway/run.py, no new threading primitives - Draft transport (sendMessageDraft, Bot API 9.5+) for private DMs only; groups/channels return Textdraft_peer_invalid by design — short-circuited at the consumer level before any API call - Edit transport: send_raw() for first message, edit_message_raw() on interval - Auto mode: draft-first with seamless mid-stream fallback to edit - already_sent flag: set on first confirmed delivery; callers skip send() - Short/instant responses (< buffer_threshold): still delivered via the same transport negotiation path, never silently dropped - Finalization: sendMessageDraft previews are ephemeral — finalize_draft() commits a permanent sendMessage() so the response is never lost - run_with_timeout(): 300 s safety timeout wraps run()
…nt wiring StreamingConfig (gateway/config.py): - Typed dataclass with live-benchmarked defaults (edit_interval=0.15s, buffer_threshold=20 chars) — 42% smoother than placeholder values in NousResearch#922 - Loaded once at startup from config.yaml streaming: block via from_dict() - _hermes_home module-level variable makes config paths monkeypatchable in tests - to_dict() / from_dict() for clean serialization roundtrip _run_agent wiring (gateway/run.py): - Reads self.config.streaming — no duplicate config.yaml file reads per message - Checks adapter.supports_streaming before creating consumer — other platforms fall through unchanged - Consumer task started alongside progress_task; awaited before reading already_sent - already_sent propagated into handler_result dict so base adapter skips re-send - Quick-commands lookup fixed: self.config.get() -> getattr() (config is dataclass) - Stream consumer cleaned up in finally block alongside existing task cleanup
…ry, ephemeral draft fix gateway/platforms/base.py: - finalize_draft() base signature extended with draft_message_id and draft_id optional kwargs to match the Telegram override and stream consumer call site - send_draft() return type corrected: returns message_id string on first call (for in-place edit caching), True on subsequent updates, False on failure gateway/platforms/telegram.py: - Groups/channels: sendMessageDraft returns Textdraft_peer_invalid for negative chat_ids by Telegram design — short-circuit before any API call - FloodWait retry on send() and edit_message(): parse Retry-After header, cap wait at 30 s, retry once via _flood_retried guard (no infinite recursion) - FloodWait retry on edit_message_raw(): loop-based, cap 10 s for intermediate streaming edits which are more rate-limit-sensitive in groups - finalize_draft() bug fix: sendMessageDraft previews are ephemeral — prior code called send_message_draft() to finalize, causing responses to disappear. Now commits via send_message() so the reply is a permanent Telegram message. - Docstring corrected; redundant inline imports removed; print -> logger.warning run_agent.py and Codex test fixes: - Synthesise message item when Codex stream omits response.output deltas - Include response_item_id as id in Turn 2 function_call input - Patch empty content from streaming deltas; add stream timeout guard
…x regressions
Tests across four files:
gateway/tests/test_streaming.py 31 tests — GatewayStreamConsumer unit suite
tests/test_gateway_streaming.py 56 tests — full gateway integration suite + live benchmark
tests/test_streaming_regression.py 4 tests — Bug 2 / Bug 5 regression guards
tests/test_run_agent_codex_responses.py 46 tests — Codex streaming delta pipeline,
empty-content patch (case a+b), reasoning-only skip
The Codex tests live in test_run_agent_codex_responses.py because they test
run_agent.AIAgent internals; test_streaming_regression.py covers the gateway
consumer bug fixes that required cross-file integration.
5 tasks
teknium1
added a commit
that referenced
this pull request
Mar 16, 2026
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs #922 (unified streaming), #1312 (gateway consumer), #774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
4 tasks
teknium1
added a commit
that referenced
this pull request
Mar 16, 2026
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
Contributor
|
Merged via PR #1538 — unified streaming infrastructure. Your streaming concepts and code patterns were incorporated with authorship credit in the commit messages. Thank you for your contribution! |
angelburgosrosado
pushed a commit
to angelburgosrosado/hermes-agent
that referenced
this pull request
Apr 27, 2026
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
angelburgosrosado
pushed a commit
to angelburgosrosado/hermes-agent
that referenced
this pull request
Apr 27, 2026
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
This was referenced Apr 28, 2026
02356abc
pushed a commit
to 02356abc/hermes-agent
that referenced
this pull request
May 14, 2026
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
02356abc
pushed a commit
to 02356abc/hermes-agent
that referenced
this pull request
May 14, 2026
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
olympus-terminal
pushed a commit
to olympus-terminal/hermes-agent
that referenced
this pull request
May 16, 2026
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
olympus-terminal
pushed a commit
to olympus-terminal/hermes-agent
that referenced
this pull request
May 16, 2026
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
Egavasyug
pushed a commit
to Egavasyug/hermes-agent
that referenced
this pull request
Jun 10, 2026
… providers Stage 1 of streaming support. Adds: - stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery - _interruptible_streaming_api_call() handling chat_completions + anthropic_messages - Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming - _fire_stream_delta() fires both display and TTS callbacks - _fire_reasoning_delta() for reasoning content streaming - Tool-call suppression: callbacks only fire on text-only responses - on_first_delta callback for spinner control on first token - Provider fallback: graceful degradation to non-streaming - _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks - Anthropic streaming returns native Message for downstream compatibility Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer), NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes). Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
Egavasyug
pushed a commit
to Egavasyug/hermes-agent
that referenced
this pull request
Jun 10, 2026
…eamConsumer, already_sent
Stage 3 of streaming support. Gateway now streams tokens to messaging platforms:
- StreamingConfig dataclass (enabled, transport, edit_interval, buffer_threshold, cursor)
on GatewayConfig with from_dict/to_dict serialization
- GatewayStreamConsumer: async queue-based consumer that progressively edits
a single message on the target platform (edit transport)
- on_delta() → queue → run() async task → send_or_edit() with rate limiting
- already_sent propagation: when streaming delivered the response, handler
returns None so base adapter skips duplicate send()
- stream_delta_callback wired into AIAgent constructor in _run_agent
- Consumer lifecycle: started as asyncio task, awaited with timeout in finally
Config (config.yaml):
streaming:
enabled: true
transport: edit # progressive editMessageText
edit_interval: 0.3 # seconds between edits
buffer_threshold: 40 # chars before forcing flush
cursor: ' ▉'
Credit: jobless0x (NousResearch#774, NousResearch#1312), OutThisLife (NousResearch#798), clicksingh (NousResearch#697).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Completes the gateway streaming implementation left unfinished in #922. The
GatewayStreamConsumeris now fully implemented,_run_agentis wired end-to-end, Telegram transport correctness is fixed,StreamingConfigis a proper typed dataclass, and a 137-test suite covers every code path.Builds directly on #922 — Layer 2 completion, not a replacement.
Background
#774 (jobless0x) introduced the dual-transport Telegram streaming architecture:
sendMessageDraftprimary,editMessageTextfallback,streaming:config section, Codex Responses streaming.#922 (teknium1) unified #774 with #798 (OutThisWorld's CLI streaming) into a coherent base, fixed
message_thread_idthreading, and standardized defaults. The gateway consumer was deferred pending end-to-end validation.This PR is the Layer 2 completion: the consumer is real, every in-scope "still needed" item from #922 is addressed, and the full test suite lands.
What was taken from each:
stream_delta_callbackonAIAgent.__init___interruptible_streaming_api_call()_stream_delta)already_sentanti-duplication contract_run_agentreturnon_first_deltaspinner controlsendMessageDraftprimary transportsend_raw/edit_message_rawstreaming:config sectionStreamingConfigdataclassWhat was fixed/improved:
GatewayStreamConsumerfully implemented — the async queue-based implementation:on_delta()(sync, called from agent thread) → asyncio queue →run()async task dispatches to draft or edit transport.StreamingConfigpromoted to typed dataclass — feat: unified streaming infrastructure (draft + edit fallback) #922 carried streaming config as a raw dict. Now a proper@dataclasswithto_dict()/from_dict(), sensible defaults (transport="edit",edit_interval=0.15,buffer_threshold=20), andstreaming: StreamingConfig = field(default_factory=StreamingConfig)onGatewayConfig.already_sentpropagated through_run_agent— the contract existed at the adapter layer but was not wired up the call stack._run_agentnow returns{"content": response, "already_sent": True}and the consumer task is awaited in afinallyblock for guaranteed cleanup.Bot API 9.5 — draft works in ALL chat types — feat: unified streaming infrastructure (draft + edit fallback) #922 documented Bot API 9.3+ / private chats only. Bot API 9.5 (March 2026) removed the private-chat restriction;
sendMessageDraftnow works in groups, supergroups, and channels. Group detection guard updated to match.FloodWait retry loop on edit transport —
edit_message_raw()now catchestelegram.error.RetryAfter, sleeps the prescribed wait, and retries once before falling back._cfg()helper — DRY config duck-typing — the consumer works with both aStreamingConfigdataclass and a plain dict (for test doubles)._cfg(cfg, key, default)encapsulates thegetattr/dict.getbranching in one place.print()→logger.info/debug— allprint(f"[stream]...", flush=True)replaced with structured logging onlogging.getLogger("gateway.stream_consumer"), consistent with hermes convention.random.randrangeinstead ofrandom.randint—random.randint(0, 2**31 - 1)has an inclusive upper bound producing a dead value at the top of the range.random.randrange(1, 2**31)is correct.Dead
_hermes_homeremoved fromgateway/config.py— declared inacf1c21as "patchable in tests" but all callsites were replaced byget_hermes_home()during conflict resolution.Architecture
Config
What was still needed (from #922) — status
sendMessageDraftbehavior with PTB 22.6 — Bot API 9.5 confirmed; all-chat-type restriction liftedTestFloodWaitRetryregression suite addedbench_live_streaming.py): 1/5/10 concurrent sessions, wall-time + delivery latency; requires real credentials, not wired into CIFiles changed
Implementation
gateway/stream_consumer.py— async queue consumer, dual transport,_cfghelper (+290, new)gateway/config.py—StreamingConfigdataclass,GatewayConfig.streamingfield (+52)gateway/platforms/telegram.py—send_raw,edit_message_raw,send_draft,finalize_draft,delete_message, FloodWait retry (+204)gateway/platforms/base.py—supports_streaming,supports_draft_streaming,already_sentguard (+65)gateway/run.py— consumer instantiation, task lifecycle,already_sentpropagation (+65)run_agent.py— full streaming dispatch chain, Codex stream wiring,reasoning_onlyguard (+295)cli.py— streaming guard prevents double-print (+29)Tests
gateway/tests/test_streaming.py— consumer unit + integration tests (+689, new)tests/test_gateway_streaming.py— end-to-end gateway streaming tests (+1359, new)tests/test_streaming_regression.py— Bug 2 (double-message) + Bug 5 (FloodWait) regression suite (+187, new)tests/test_run_agent_codex_responses.py— 11 Codex stream + reasoning-only tests (+739)tests/bench_live_streaming.py— live benchmark harness (+132, new)Tests
137 tests, 0 failures. Up from 10 in #922.
Breakdown: consumer unit (42), integration (31), gateway streaming (48), Codex stream (11), regression (5). Pre-existing failures in unrelated modules (email, Discord, env-blocklist) confirmed present before our changes via
git stash.Supersedes nothing — builds on #922, which builds on #774 and #798.