Skip to content

feat: unified streaming infrastructure (draft + edit fallback)#922

Closed
teknium1 wants to merge 1 commit into
mainfrom
feat/streaming
Closed

feat: unified streaming infrastructure (draft + edit fallback)#922
teknium1 wants to merge 1 commit into
mainfrom
feat/streaming

Conversation

@teknium1

Copy link
Copy Markdown
Contributor

Summary

Unified streaming architecture combining the best aspects of PRs #774 (jobless0x) and #798 (OutThisLife), with bug fixes and improvements.

⚠️ DRAFT — awaiting proper streaming token implementation and end-to-end testing before merge.

Background

Three PRs attempted streaming independently (#697, #774, #798), all modifying the same core files with incompatible implementations. This branch unifies them into a single coherent architecture.

What was taken from each:

Aspect Source Why
stream_delta_callback on __init__ #798 Cleaner per-instance lifecycle
_interruptible_streaming_api_call() #798 Separate method, clean fallback
CLI line-buffered rendering #798 _cprint integration
already_sent anti-duplication #798 No mutable adapter state
on_first_delta spinner stop #798 Better UX on first token
Tests (10 tests) #798 Accumulator, callbacks, fallback
sendMessageDraft (Bot API 9.3+) #774 Native streaming, no rate limits
Dual transport (draft + edit) #774 Graceful degradation
send_raw / edit_message_raw #774 Plain text during streaming
streaming: config section #774 User-configurable
Codex Responses streaming #774 Full API mode coverage

What was fixed/improved:

Architecture

Layer 1 — Core (run_agent.py)
├── stream_delta_callback on AIAgent.__init__
├── _interruptible_streaming_api_call() — chat completions
├── Tool-call suppression (only text responses stream)
├── on_first_delta callback (spinner control)
└── Provider fallback (stream unsupported → non-streaming)

Layer 2 — Display
├── CLI: line-buffered _stream_delta/_flush_stream
└── Gateway: async stream consumer
    ├── Draft transport (Bot API 9.3+ sendMessageDraft)
    ├── Edit transport (progressive editMessageText)
    └── Auto mode (try draft → fallback to edit)

Config

streaming:
  enabled: true
  edit_interval: 1.0      # seconds between message edits
  buffer_threshold: 100    # chars before forcing an edit
  cursor: ""            # cursor shown during streaming
  transport: auto          # auto, draft, or edit

What is still needed

  • End-to-end testing with actual streaming providers
  • Verify sendMessageDraft behavior with PTB 22.6 in production
  • Test draft → edit fallback path with real Telegram clients
  • Consider adding streaming support for Discord/Slack adapters
  • Performance testing under concurrent sessions

Files changed

  • run_agent.py — streaming callback + API method (+159)
  • cli.py — line-buffered CLI rendering (+29)
  • gateway/run.py — stream consumer + config + wiring (+174)
  • gateway/platforms/base.py — streaming interfaces + already_sent (+45)
  • gateway/platforms/telegram.py — Telegram streaming methods (+93)
  • tests/test_streaming.py — 10 tests (+257)

Tests

10 new tests: accumulator shape (3), callback order (3), provider fallback (2), already_sent contract (2). All passing.

Supersedes #774, #798, #697.

Unified streaming architecture combining the best of PRs #774 and #798,
with improvements. This is a draft — awaiting proper streaming token
implementation and testing before merge.

Layer 1 — Core streaming (run_agent.py):
- stream_delta_callback on AIAgent.__init__ (per-instance)
- _interruptible_streaming_api_call() for chat completions with
  SimpleNamespace response reconstruction
- Tool-call suppression (callback only fires for text-only responses)
- on_first_delta callback (stops thinking spinner on first token)
- Provider fallback when streaming unsupported
- reasoning_content accumulation
- Interrupt support (client.close() + rebuild)

Layer 2 — Display (cli.py, gateway/):
- CLI: line-buffered _stream_delta/_flush_stream via _cprint
- Gateway: async stream consumer with dual transport:
  * Draft (Bot API 9.3+ sendMessageDraft) as primary
  * Progressive editMessageText as fallback
  * Auto mode tries draft, falls back seamlessly
- Config-driven: streaming.enabled, edit_interval, buffer_threshold,
  cursor, transport (auto/draft/edit)
- Uses self.config (no duplicate yaml reads)
- already_sent flag prevents duplicate sends in base.py

Telegram-specific (gateway/platforms/telegram.py):
- send_raw / edit_message_raw (plain text, no MarkdownV2)
- send_draft / finalize_draft (Bot API 9.3+)
- delete_message
- All methods pass message_thread_id for forum topic support
  (fix for #774's missing thread_id bug)

Tests: 10 new tests covering accumulator shape, callback order,
tool-call suppression, provider fallback, already_sent contract.

Config example:
  streaming:
    enabled: true
    edit_interval: 1.0
    buffer_threshold: 100
    cursor: ' ▉'
    transport: auto  # auto, draft, or edit

Supersedes: #774 (jobless0x), #798 (OutThisLife), #697 (clicksingh)
jobless0x added a commit to jobless0x/hermes-agent that referenced this pull request Mar 14, 2026
…draft+edit), already_sent

Implements Layer 2 of the unified streaming architecture described in NousResearch#922,
which explicitly lists the async gateway stream consumer as awaiting implementation.

GatewayStreamConsumer bridges the sync stream_delta_callback from the agent
thread to the async Telegram transport methods using an asyncio.Queue.

Key design decisions:
- Thread bridge via queue.Queue + finish() sentinel — matches the existing
  tool_progress_callback pattern in gateway/run.py, no new threading primitives
- Draft transport (sendMessageDraft, Bot API 9.5+) for private DMs only;
  groups/channels return Textdraft_peer_invalid by design — short-circuited
  at the consumer level before any API call
- Edit transport: send_raw() for first message, edit_message_raw() on interval
- Auto mode: draft-first with seamless mid-stream fallback to edit
- already_sent flag: set on first confirmed delivery; callers skip send()
- Short/instant responses (< buffer_threshold): still delivered via the
  same transport negotiation path, never silently dropped
- Finalization: sendMessageDraft previews are ephemeral — finalize_draft()
  commits a permanent sendMessage() so the response is never lost
- run_with_timeout(): 300 s safety timeout wraps run()
jobless0x added a commit to jobless0x/hermes-agent that referenced this pull request Mar 14, 2026
…nt wiring

StreamingConfig (gateway/config.py):
- Typed dataclass with live-benchmarked defaults (edit_interval=0.15s,
  buffer_threshold=20 chars) — 42% smoother than placeholder values in NousResearch#922
- Loaded once at startup from config.yaml streaming: block via from_dict()
- _hermes_home module-level variable makes config paths monkeypatchable in tests
- to_dict() / from_dict() for clean serialization roundtrip

_run_agent wiring (gateway/run.py):
- Reads self.config.streaming — no duplicate config.yaml file reads per message
- Checks adapter.supports_streaming before creating consumer — other platforms
  fall through unchanged
- Consumer task started alongside progress_task; awaited before reading already_sent
- already_sent propagated into handler_result dict so base adapter skips re-send
- Quick-commands lookup fixed: self.config.get() -> getattr() (config is dataclass)
- Stream consumer cleaned up in finally block alongside existing task cleanup
@jobless0x

Copy link
Copy Markdown

Heads up, opened a completion PR (#1312) that builds directly on this one.

Delivers the dual-transport GatewayStreamConsumer (the gateway consumer deferred here), Bot API 9.5 draft support across all chat types, and FloodWait retry on the edit transport.

Key additions: StreamingConfig typed dataclass, already_sent propagated through _run_agent, structured logging throughout.

All in-scope "What is still needed" items are checked off. 137 tests, 0 failures.

teknium1 added a commit that referenced this pull request Mar 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs #922 (unified streaming), #1312 (gateway consumer),
#774 (Telegram streaming), #798 (CLI streaming), #1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
@teknium1

Copy link
Copy Markdown
Contributor Author

Merged via PR #1538 — unified streaming infrastructure. Your streaming concepts and code patterns were incorporated with authorship credit in the commit messages. Thank you for your contribution!

@teknium1 teknium1 closed this Mar 16, 2026
angelburgosrosado pushed a commit to angelburgosrosado/hermes-agent that referenced this pull request Apr 27, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
02356abc pushed a commit to 02356abc/hermes-agent that referenced this pull request May 14, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
olympus-terminal pushed a commit to olympus-terminal/hermes-agent that referenced this pull request May 16, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
Egavasyug pushed a commit to Egavasyug/hermes-agent that referenced this pull request Jun 10, 2026
… providers

Stage 1 of streaming support. Adds:

- stream_delta_callback parameter on AIAgent.__init__ for real-time token delivery
- _interruptible_streaming_api_call() handling chat_completions + anthropic_messages
- Enhanced _run_codex_stream() to fire delta callbacks during Codex streaming
- _fire_stream_delta() fires both display and TTS callbacks
- _fire_reasoning_delta() for reasoning content streaming
- Tool-call suppression: callbacks only fire on text-only responses
- on_first_delta callback for spinner control on first token
- Provider fallback: graceful degradation to non-streaming
- _has_stream_consumers() unifies stream_delta_callback and _stream_callback checks
- Anthropic streaming returns native Message for downstream compatibility

Drawing from PRs NousResearch#922 (unified streaming), NousResearch#1312 (gateway consumer),
NousResearch#774 (Telegram streaming), NousResearch#798 (CLI streaming), NousResearch#1214 (reasoning modes).
Credit: jobless0x, OutThisLife, clicksingh, raulvidis.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants