fix(api_server): propagate SSE batch flush failures to main streaming loop#398
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to propagate errors from the background batch-flushing task to the main streaming loop using a sentinel and a shared error state. This ensures that exceptions during SSE delta emission, such as client disconnects, correctly interrupt the agent and result in an incomplete response status. A new test case has been added to verify this behavior. Feedback was provided regarding the potential for event loop stalls when calling stream_q.put() on a queue.Queue directly, suggesting the use of an executor or an asyncio.Queue for better concurrency.
| except Exception as exc: | ||
| _batch_timer = None | ||
| _batch_error = exc | ||
| stream_q.put(_batch_error_sentinel) |
There was a problem hiding this comment.
Calling stream_q.put() directly from the event loop on a queue.Queue is technically a blocking operation. While stream_q is unbounded here, it still involves acquiring a threading.Lock. In a high-concurrency environment, this could lead to minor event loop stalls if the executor thread (which is doing stream_q.get()) is holding the lock. A more idiomatic async approach would be to use loop.run_in_executor(None, stream_q.put, _batch_error_sentinel) or switch to an asyncio.Queue for the main loop and use a thread-safe wrapper for the agent callbacks.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Propagates exceptions from the background SSE text-delta batch flush task back into the main streaming loop so disconnect/write failures trigger the existing disconnect handling and agent interruption.
Changes:
- Add a batch-error sentinel + stored exception to notify the main SSE loop when the batch flush task fails.
- Update the main streaming loop to detect the sentinel and re-raise the stored exception.
- Add a regression test intended to simulate disconnect during background batch flushing and assert the agent is interrupted + an incomplete snapshot is stored.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
gateway/platforms/api_server.py |
Adds sentinel-based error propagation from background batch flush to the main streaming loop. |
tests/gateway/test_api_server.py |
Adds a regression test for disconnect during batched delta flushing and ensuing agent interruption/persistence behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| stream_q: _q.Queue = _q.Queue() | ||
| stream_q.put("batched text") |
| write_call_count = {"n": 0} | ||
|
|
||
| class _DisconnectingStreamResponse: | ||
| async def prepare(self, req): | ||
| pass | ||
|
|
||
| async def write(self, payload): | ||
| write_call_count["n"] += 1 | ||
| if write_call_count["n"] >= 3: |
| timeout=1.0, | ||
| ) | ||
|
|
||
| mock_agent.interrupt.assert_called_once_with("SSE client disconnected") |
| async def _batch_flush_after(delay: float) -> None: | ||
| """Wait delay seconds, then flush accumulated text deltas.""" | ||
| nonlocal _batch_error, _batch_timer | ||
| try: | ||
| await asyncio.sleep(delay) | ||
| # Clear timer reference BEFORE flush so new deltas | ||
| # can start a fresh timer while we emit | ||
| _batch_timer = None | ||
| await _flush_batch() | ||
| except asyncio.CancelledError: | ||
| return | ||
| # Clear timer reference BEFORE flush so new deltas | ||
| # can start a fresh timer while we emit | ||
| nonlocal _batch_buf, _batch_timer | ||
| _batch_timer = None | ||
| await _flush_batch() | ||
| except Exception as exc: | ||
| _batch_timer = None | ||
| _batch_error = exc | ||
| stream_q.put(_batch_error_sentinel) |
| except Exception as exc: | ||
| _batch_timer = None | ||
| _batch_error = exc | ||
| stream_q.put(_batch_error_sentinel) |
| if item is _batch_error_sentinel: | ||
| if _batch_error is not None: | ||
| raise _batch_error |
| if item is _batch_error_sentinel: | ||
| if _batch_error is not None: | ||
| raise _batch_error |
🔎 Lint report:
|
|
@copilot fix pending checks Generated by Claude Code |
… loop
When the batched-delta background task ("_batch_flush_after") hit a
ConnectionResetError on response.write(), the exception was swallowed
in the detached task and the main loop kept waiting on stream_q for
items that would never arrive — the streaming endpoint hung until the
client timed out and the agent was never interrupted.
Restore #398's fix: catch the flush exception in the background task,
stash it in _batch_error, and push a sentinel into stream_q so the main
loop re-raises it on dequeue. Both the live loop and the drain path
honour the sentinel.
Clears test_stream_batched_delta_disconnect_interrupts_agent.
* fix: restore session/auth helpers lost in merge conflicts
A series of merge resolutions dropped several helpers while keeping their
call sites and tests. The result was a broad cluster of NameError /
AttributeError / TypeError failures across gateway, cron, web-tools and
api-server tests.
- gateway/run.py: restore `team_id` definition in `_is_user_authorized`;
it was deleted but two call sites still reference it.
- gateway/session_context.py: restore `get_terminal_cwd` /
`set_terminal_cwd` / `reset_terminal_cwd` helpers (and the underlying
`_TERMINAL_CWD` ContextVar) that run_agent.py imports.
- tools/web_tools.py: rename `_ddgs_package_importable` to
`_ddgs_package_available` (with a backward-compat alias) so tests can
monkeypatch the expected symbol; drop ddgs from the auto-detect
fallback so just having the package importable doesn't silently opt
users into a rate-limited HTML-scraping backend.
- gateway/platforms/webhook.py: reject unresolved `${VAR}` placeholder
secrets; treating them as real HMAC secrets silently weakened auth.
- gateway/platforms/api_server.py: restore `_constant_time_equal` so
unicode API keys compare safely instead of raising TypeError from
`hmac.compare_digest`.
* fix(api_server): guard _constant_time_equal against None inputs
Defense in depth: the existing _check_auth caller early-returns when
self._api_key is falsy, so None can't actually reach this helper today,
but accepting Optional[str] and short-circuiting to False keeps the
helper safe for any future caller and matches the type the call site
already permits.
* fix(web_tools): delegate _ddgs_package_available to the safe helper
A bare ``import ddgs`` executes any local ``ddgs.py`` shadowing the
installed package on sys.path. The provider module already has a safer
non-importing check (metadata + find_spec, verified by
test_availability_does_not_import_shadowed_local_module). Delegate to
it so both call paths share the same protection.
* fix(approval): restore set_/reset_/get_current_run_id helpers
A prior merge removed these helpers from tools/approval.py but kept the
api_server callers that import them, breaking every /v1/runs request
with ImportError. Restore the contextvar (`_approval_run_id`) and the
three accessors so the API run path can bind a per-task run id to
pending gateway approvals again.
Clears the 7 failures under tests/gateway/test_api_server_runs.py.
* fix(api_server): restore proxy_scope authentication
A prior merge deleted gateway/proxy_scope_auth.py and stripped the
HMAC-signature check from the /v1/chat/completions handler, while
leaving the tests that exercise both paths. Restore the module and
re-wire the handler:
- Imports verify_proxy_scope_signature + signature/timestamp headers
from gateway.proxy_scope_auth.
- Uses ``"hermes_proxy_scope" in body`` (not truthy check) so an
explicit JSON ``null`` is rejected with 400.
- Returns 403 with "trusted gateway proxy authentication" when the
signature is missing or invalid.
Clears the 3 TestChatCompletionsEndpoint failures.
* fix(run_agent): harden assistant-message scrub against non-str helper return
When _strip_think_blocks is mocked in tests (TestInlineThinkBlockExtraction
binds only _build_assistant_message + _extract_reasoning, leaving every
other method as a MagicMock), it returns a MagicMock instead of a string.
sanitize_context() then crashes because re.sub expects str/bytes.
Guard the scrub: if _strip_think_blocks returns a string, sanitize that;
otherwise fall back to sanitizing the original _san_content. Production
agents always return a string, so behavior there is unchanged.
Clears the 7 TestInlineThinkBlockExtraction failures.
* fix(api_server): SSRF-block private/internal image URLs
A prior merge dropped the is_safe_url() check on http(s) image URLs from
_normalize_multimodal_content, leaving only the scheme guard. Image URLs
pointing at private/internal addresses now reach the multimodal pipeline
and can exfiltrate internal-network content (test_private_image_url_rejected,
test_cloud_metadata_image_url_rejected). Re-add the check before the URL
is normalized into the image part.
* fix(tests): restore _FakeProviderMemoryManager and align memory-context test with scrub semantics
Two lost-in-merge regressions in tests/run_agent/test_run_agent.py:
1. _FakeProviderMemoryManager class was deleted but two
TestConcurrentToolExecution tests still instantiate it, raising
NameError. Restore the minimal double from #209.
2. test_memory_context_in_stored_content_is_preserved was renamed to
_is_scrubbed in #478 and the assertions inverted to match the
storage-boundary scrub the production code now performs. The pre-rename
version kept the old "preserve" assertions, which fail against the
correct production behaviour. Update the test to its post-#478 form.
* fix(tools_config): restore HASS_TOKEN opt-in + cross-platform MCP server fanout
Two lost-in-merge regressions in hermes_cli/tools_config.py:
- _implicit_default_off_toolsets no longer dropped homeassistant from
default_off when HASS_TOKEN was set. That regressed Norbert's HA cron
setup (the original PR NousResearch#14798 carve-out) because cron / cli would
silently drop the toolset even though the operator had provisioned
credentials. Restore the HASS_TOKEN check.
- _get_platform_tools required platform_toolsets to explicitly re-list
every globally configured MCP server (exa, web-search-prime, etc.) to
keep them enabled. Once a platform had any explicit builtin toolset
list, MCP servers vanished. Restore the simpler rule: no_mcp opts out;
otherwise enabled_mcp_servers always fan out — explicit builtin
selection is the platform allowlist, not the MCP allowlist.
Clears 2 test_tools_config failures and the related
test_reasoning_command "exa in toolsets" assertion.
* fix(browser_tool): use url_contains_secret for navigate's secret check
The inline secret-block in browser_navigate only ran one urllib.unquote
pass, so a URL with double-percent-encoded prefixes (sk%252Dant%252D…)
slipped through and reached the browser. agent.redact.url_contains_secret
applies repeated decoding (3 passes) and also splits the URL into
component values before matching, so it catches the multi-encode tricks
that test_blocks_percent_encoded_api_key_in_url and
test_blocks_split_api_key_in_query_values exercise.
Clears 2 test_browser_secret_exfil failures.
* fix: enforce SSRF + attachment-auth gates lost in merge
Two adjacent security regressions:
- tools/browser_tool.py: pre-navigation SSRF check skipped local backends
(`not _is_local_backend()` short-circuited the guard) even though the
surrounding comment explicitly states local backends must enforce it
too — browser_snapshot can return local-file / internal-service
responses in reduced-tool configurations. Drop the local-backend skip
so the guard fires unless the operator opts in via
``browser.allow_private_urls``.
- gateway/platforms/qqbot/adapter.py: restore the attachment pre-auth
gate from #349. _handle_c2c/_handle_group/_handle_guild/_handle_dm
now check `_is_source_authorized_for_attachment_processing` before
calling `_process_attachments`, and forward a text-only event when
the sender isn't allowlisted. This prevents an unauthorized sender
from forcing the bot to fetch attacker-controlled attachment URLs
(SSRF amplification, large-file DoS, redirect attacks). Failure-closed
when gateway_runner isn't attached yet, with a throttled warning so
startup races don't spam the log.
Clears 2 test_browser_ssrf_local failures and
test_unauthorized_c2c_skips_attachment_processing.
* fix(kanban): scope worker child reaping to known PIDs only
dispatch_once() was calling waitpid(-1, WNOHANG) on every tick, which
reaps any zombie child of the gateway process — including non-kanban
subprocesses (npm install, agent-browser, etc.) whose callers rely on
their own Popen.wait()/subprocess.run() exit status. That broke
unrelated tools whenever the kanban dispatcher ran in the same process.
Restore the scoped reaper from #393: track each kanban worker PID in
_known_worker_child_pids when it's persisted via _set_worker_pid, and
in dispatch_once only waitpid those specific PIDs. Windows is still a
no-op (no zombies / no WNOHANG).
Clears test_source_gates_waitpid_loop.
* fix(api_server): propagate SSE batch flush failures to main streaming loop
When the batched-delta background task ("_batch_flush_after") hit a
ConnectionResetError on response.write(), the exception was swallowed
in the detached task and the main loop kept waiting on stream_q for
items that would never arrive — the streaming endpoint hung until the
client timed out and the agent was never interrupted.
Restore #398's fix: catch the flush exception in the background task,
stash it in _batch_error, and push a sentinel into stream_q so the main
loop re-raises it on dequeue. Both the live loop and the drain path
honour the sentinel.
Clears test_stream_batched_delta_disconnect_interrupts_agent.
* fix(browser_tool): honor restricted PATH for Homebrew/user-writable trust roots
A prior merge widened _SANE_PATH_DIRS to include /opt/homebrew/{bin,sbin}
and /usr/local/{bin,sbin} unconditionally and made _browser_candidate_path_dirs
always inject Homebrew node prefixes. Cron / systemd / locked-down operator
configs that intentionally strip those trust roots from PATH would silently
get them injected back, defeating the restriction.
Restore #234's design:
- _SANE_PATH_DIRS only includes Termux + system dirs (/usr/{bin,sbin}, /{bin,sbin}).
- _browser_candidate_path_dirs(existing_path) takes the operator-provided PATH
and only adds Homebrew node prefixes / /usr/local / hermes-managed Node bin
when the operator already opted into that trust root.
- _find_agent_browser passes os.environ.get("PATH","") through to
_merge_browser_path so the gating actually fires (previously it passed "").
Clears all 4 test_browser_homebrew_paths failures.
---------
Co-authored-by: Claude <noreply@anthropic.com>
Motivation
Description
_batch_errorand_batch_error_sentineland update_batch_flush_afterto catch any exception and enqueue a sentinel intostream_qso the main loop is notified instead of failing silently.agent.interrupt(), and cancelling the agent task) runs as intended.test_stream_batched_delta_disconnect_interrupts_agentintests/gateway/test_api_server.pythat simulates a client disconnect during the background batch flush and verifies the agent is interrupted and anincompletesnapshot is persisted.Testing
python -m py_compile gateway/platforms/api_server.py tests/gateway/test_api_server.pysucceeded.ruff check gateway/platforms/api_server.py tests/gateway/test_api_server.pysucceeded.asyncio.runinvocation of the handler) validated the disconnect scenario and passed locally (simulated client disconnect producedagent.interrupt()and agent cancellation).pytestfor the test in this environment was blocked due to missing test dependencies (pytest-asyncio/aiohttp), so the new test was added but could not be executed viapytesthere.Codex Task