Skip to content

fix(api_server): propagate SSE batch flush failures to main streaming loop#398

Merged
github-actions[bot] merged 2 commits into
mainfrom
badmade/fix-sse-batching-disconnect-handling
May 22, 2026
Merged

fix(api_server): propagate SSE batch flush failures to main streaming loop#398
github-actions[bot] merged 2 commits into
mainfrom
badmade/fix-sse-batching-disconnect-handling

Conversation

@badMade

@badMade badMade commented May 21, 2026

Copy link
Copy Markdown
Owner

Motivation

  • The SSE text-delta batching task could raise write/disconnect exceptions in a detached asyncio task, which bypassed the outer disconnect handler and allowed the agent to continue running and consuming resources.

Description

  • Add _batch_error and _batch_error_sentinel and update _batch_flush_after to catch any exception and enqueue a sentinel into stream_q so the main loop is notified instead of failing silently.
  • Make the main streaming loop check for the sentinel and re-raise the stored exception so the existing client-disconnect handling (persisting incomplete snapshot, calling agent.interrupt(), and cancelling the agent task) runs as intended.
  • Add a regression test test_stream_batched_delta_disconnect_interrupts_agent in tests/gateway/test_api_server.py that simulates a client disconnect during the background batch flush and verifies the agent is interrupted and an incomplete snapshot is persisted.

Testing

  • python -m py_compile gateway/platforms/api_server.py tests/gateway/test_api_server.py succeeded.
  • ruff check gateway/platforms/api_server.py tests/gateway/test_api_server.py succeeded.
  • A focused async reproduction (direct asyncio.run invocation of the handler) validated the disconnect scenario and passed locally (simulated client disconnect produced agent.interrupt() and agent cancellation).
  • Running the full pytest for the test in this environment was blocked due to missing test dependencies (pytest-asyncio / aiohttp), so the new test was added but could not be executed via pytest here.

Codex Task

Copilot AI review requested due to automatic review settings May 21, 2026 01:08

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to propagate errors from the background batch-flushing task to the main streaming loop using a sentinel and a shared error state. This ensures that exceptions during SSE delta emission, such as client disconnects, correctly interrupt the agent and result in an incomplete response status. A new test case has been added to verify this behavior. Feedback was provided regarding the potential for event loop stalls when calling stream_q.put() on a queue.Queue directly, suggesting the use of an executor or an asyncio.Queue for better concurrency.

except Exception as exc:
_batch_timer = None
_batch_error = exc
stream_q.put(_batch_error_sentinel)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Calling stream_q.put() directly from the event loop on a queue.Queue is technically a blocking operation. While stream_q is unbounded here, it still involves acquiring a threading.Lock. In a high-concurrency environment, this could lead to minor event loop stalls if the executor thread (which is doing stream_q.get()) is holding the lock. A more idiomatic async approach would be to use loop.run_in_executor(None, stream_q.put, _batch_error_sentinel) or switch to an asyncio.Queue for the main loop and use a thread-safe wrapper for the agent callbacks.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Propagates exceptions from the background SSE text-delta batch flush task back into the main streaming loop so disconnect/write failures trigger the existing disconnect handling and agent interruption.

Changes:

  • Add a batch-error sentinel + stored exception to notify the main SSE loop when the batch flush task fails.
  • Update the main streaming loop to detect the sentinel and re-raise the stored exception.
  • Add a regression test intended to simulate disconnect during background batch flushing and assert the agent is interrupted + an incomplete snapshot is stored.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 7 comments.

File Description
gateway/platforms/api_server.py Adds sentinel-based error propagation from background batch flush to the main streaming loop.
tests/gateway/test_api_server.py Adds a regression test for disconnect during batched delta flushing and ensuing agent interruption/persistence behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +1941 to +1942
stream_q: _q.Queue = _q.Queue()
stream_q.put("batched text")
Comment on lines +1927 to +1935
write_call_count = {"n": 0}

class _DisconnectingStreamResponse:
async def prepare(self, req):
pass

async def write(self, payload):
write_call_count["n"] += 1
if write_call_count["n"] >= 3:
timeout=1.0,
)

mock_agent.interrupt.assert_called_once_with("SSE client disconnected")
Comment on lines 1831 to +1845
async def _batch_flush_after(delay: float) -> None:
"""Wait delay seconds, then flush accumulated text deltas."""
nonlocal _batch_error, _batch_timer
try:
await asyncio.sleep(delay)
# Clear timer reference BEFORE flush so new deltas
# can start a fresh timer while we emit
_batch_timer = None
await _flush_batch()
except asyncio.CancelledError:
return
# Clear timer reference BEFORE flush so new deltas
# can start a fresh timer while we emit
nonlocal _batch_buf, _batch_timer
_batch_timer = None
await _flush_batch()
except Exception as exc:
_batch_timer = None
_batch_error = exc
stream_q.put(_batch_error_sentinel)
Comment on lines +1842 to +1845
except Exception as exc:
_batch_timer = None
_batch_error = exc
stream_q.put(_batch_error_sentinel)
Comment on lines +1866 to +1868
if item is _batch_error_sentinel:
if _batch_error is not None:
raise _batch_error
Comment on lines +1882 to +1884
if item is _batch_error_sentinel:
if _batch_error is not None:
raise _batch_error
@github-actions

github-actions Bot commented May 21, 2026

Copy link
Copy Markdown

🔎 Lint report: badmade/fix-sse-batching-disconnect-handling vs origin/main

ruff

Total: 0 on HEAD, 0 on base (➖ 0)

🆕 New issues: none

✅ Fixed issues: none

Unchanged: 0 pre-existing issues carried over.

ty (type checker)

Total: 8177 on HEAD, 8177 on base (➖ 0)

🆕 New issues: none

✅ Fixed issues: none

Unchanged: 4295 pre-existing issues carried over.

Diagnostics are surfaced as warnings — this check never fails the build.

badMade commented May 22, 2026

Copy link
Copy Markdown
Owner Author

@copilot fix pending checks


Generated by Claude Code

@github-actions github-actions Bot merged commit cd9fe96 into main May 22, 2026
15 of 16 checks passed
Copilot stopped work on behalf of badMade due to an error May 22, 2026 04:06
badMade pushed a commit that referenced this pull request Jun 1, 2026
… loop

When the batched-delta background task ("_batch_flush_after") hit a
ConnectionResetError on response.write(), the exception was swallowed
in the detached task and the main loop kept waiting on stream_q for
items that would never arrive — the streaming endpoint hung until the
client timed out and the agent was never interrupted.

Restore #398's fix: catch the flush exception in the background task,
stash it in _batch_error, and push a sentinel into stream_q so the main
loop re-raises it on dequeue. Both the live loop and the drain path
honour the sentinel.

Clears test_stream_batched_delta_disconnect_interrupts_agent.
github-actions Bot pushed a commit that referenced this pull request Jun 2, 2026
* fix: restore session/auth helpers lost in merge conflicts

A series of merge resolutions dropped several helpers while keeping their
call sites and tests. The result was a broad cluster of NameError /
AttributeError / TypeError failures across gateway, cron, web-tools and
api-server tests.

- gateway/run.py: restore `team_id` definition in `_is_user_authorized`;
  it was deleted but two call sites still reference it.
- gateway/session_context.py: restore `get_terminal_cwd` /
  `set_terminal_cwd` / `reset_terminal_cwd` helpers (and the underlying
  `_TERMINAL_CWD` ContextVar) that run_agent.py imports.
- tools/web_tools.py: rename `_ddgs_package_importable` to
  `_ddgs_package_available` (with a backward-compat alias) so tests can
  monkeypatch the expected symbol; drop ddgs from the auto-detect
  fallback so just having the package importable doesn't silently opt
  users into a rate-limited HTML-scraping backend.
- gateway/platforms/webhook.py: reject unresolved `${VAR}` placeholder
  secrets; treating them as real HMAC secrets silently weakened auth.
- gateway/platforms/api_server.py: restore `_constant_time_equal` so
  unicode API keys compare safely instead of raising TypeError from
  `hmac.compare_digest`.

* fix(api_server): guard _constant_time_equal against None inputs

Defense in depth: the existing _check_auth caller early-returns when
self._api_key is falsy, so None can't actually reach this helper today,
but accepting Optional[str] and short-circuiting to False keeps the
helper safe for any future caller and matches the type the call site
already permits.

* fix(web_tools): delegate _ddgs_package_available to the safe helper

A bare ``import ddgs`` executes any local ``ddgs.py`` shadowing the
installed package on sys.path. The provider module already has a safer
non-importing check (metadata + find_spec, verified by
test_availability_does_not_import_shadowed_local_module). Delegate to
it so both call paths share the same protection.

* fix(approval): restore set_/reset_/get_current_run_id helpers

A prior merge removed these helpers from tools/approval.py but kept the
api_server callers that import them, breaking every /v1/runs request
with ImportError. Restore the contextvar (`_approval_run_id`) and the
three accessors so the API run path can bind a per-task run id to
pending gateway approvals again.

Clears the 7 failures under tests/gateway/test_api_server_runs.py.

* fix(api_server): restore proxy_scope authentication

A prior merge deleted gateway/proxy_scope_auth.py and stripped the
HMAC-signature check from the /v1/chat/completions handler, while
leaving the tests that exercise both paths. Restore the module and
re-wire the handler:

- Imports verify_proxy_scope_signature + signature/timestamp headers
  from gateway.proxy_scope_auth.
- Uses ``"hermes_proxy_scope" in body`` (not truthy check) so an
  explicit JSON ``null`` is rejected with 400.
- Returns 403 with "trusted gateway proxy authentication" when the
  signature is missing or invalid.

Clears the 3 TestChatCompletionsEndpoint failures.

* fix(run_agent): harden assistant-message scrub against non-str helper return

When _strip_think_blocks is mocked in tests (TestInlineThinkBlockExtraction
binds only _build_assistant_message + _extract_reasoning, leaving every
other method as a MagicMock), it returns a MagicMock instead of a string.
sanitize_context() then crashes because re.sub expects str/bytes.

Guard the scrub: if _strip_think_blocks returns a string, sanitize that;
otherwise fall back to sanitizing the original _san_content. Production
agents always return a string, so behavior there is unchanged.

Clears the 7 TestInlineThinkBlockExtraction failures.

* fix(api_server): SSRF-block private/internal image URLs

A prior merge dropped the is_safe_url() check on http(s) image URLs from
_normalize_multimodal_content, leaving only the scheme guard. Image URLs
pointing at private/internal addresses now reach the multimodal pipeline
and can exfiltrate internal-network content (test_private_image_url_rejected,
test_cloud_metadata_image_url_rejected). Re-add the check before the URL
is normalized into the image part.

* fix(tests): restore _FakeProviderMemoryManager and align memory-context test with scrub semantics

Two lost-in-merge regressions in tests/run_agent/test_run_agent.py:

1. _FakeProviderMemoryManager class was deleted but two
   TestConcurrentToolExecution tests still instantiate it, raising
   NameError. Restore the minimal double from #209.

2. test_memory_context_in_stored_content_is_preserved was renamed to
   _is_scrubbed in #478 and the assertions inverted to match the
   storage-boundary scrub the production code now performs. The pre-rename
   version kept the old "preserve" assertions, which fail against the
   correct production behaviour. Update the test to its post-#478 form.

* fix(tools_config): restore HASS_TOKEN opt-in + cross-platform MCP server fanout

Two lost-in-merge regressions in hermes_cli/tools_config.py:

- _implicit_default_off_toolsets no longer dropped homeassistant from
  default_off when HASS_TOKEN was set. That regressed Norbert's HA cron
  setup (the original PR NousResearch#14798 carve-out) because cron / cli would
  silently drop the toolset even though the operator had provisioned
  credentials. Restore the HASS_TOKEN check.

- _get_platform_tools required platform_toolsets to explicitly re-list
  every globally configured MCP server (exa, web-search-prime, etc.) to
  keep them enabled. Once a platform had any explicit builtin toolset
  list, MCP servers vanished. Restore the simpler rule: no_mcp opts out;
  otherwise enabled_mcp_servers always fan out — explicit builtin
  selection is the platform allowlist, not the MCP allowlist.

Clears 2 test_tools_config failures and the related
test_reasoning_command "exa in toolsets" assertion.

* fix(browser_tool): use url_contains_secret for navigate's secret check

The inline secret-block in browser_navigate only ran one urllib.unquote
pass, so a URL with double-percent-encoded prefixes (sk%252Dant%252D…)
slipped through and reached the browser. agent.redact.url_contains_secret
applies repeated decoding (3 passes) and also splits the URL into
component values before matching, so it catches the multi-encode tricks
that test_blocks_percent_encoded_api_key_in_url and
test_blocks_split_api_key_in_query_values exercise.

Clears 2 test_browser_secret_exfil failures.

* fix: enforce SSRF + attachment-auth gates lost in merge

Two adjacent security regressions:

- tools/browser_tool.py: pre-navigation SSRF check skipped local backends
  (`not _is_local_backend()` short-circuited the guard) even though the
  surrounding comment explicitly states local backends must enforce it
  too — browser_snapshot can return local-file / internal-service
  responses in reduced-tool configurations. Drop the local-backend skip
  so the guard fires unless the operator opts in via
  ``browser.allow_private_urls``.

- gateway/platforms/qqbot/adapter.py: restore the attachment pre-auth
  gate from #349. _handle_c2c/_handle_group/_handle_guild/_handle_dm
  now check `_is_source_authorized_for_attachment_processing` before
  calling `_process_attachments`, and forward a text-only event when
  the sender isn't allowlisted. This prevents an unauthorized sender
  from forcing the bot to fetch attacker-controlled attachment URLs
  (SSRF amplification, large-file DoS, redirect attacks). Failure-closed
  when gateway_runner isn't attached yet, with a throttled warning so
  startup races don't spam the log.

Clears 2 test_browser_ssrf_local failures and
test_unauthorized_c2c_skips_attachment_processing.

* fix(kanban): scope worker child reaping to known PIDs only

dispatch_once() was calling waitpid(-1, WNOHANG) on every tick, which
reaps any zombie child of the gateway process — including non-kanban
subprocesses (npm install, agent-browser, etc.) whose callers rely on
their own Popen.wait()/subprocess.run() exit status. That broke
unrelated tools whenever the kanban dispatcher ran in the same process.

Restore the scoped reaper from #393: track each kanban worker PID in
_known_worker_child_pids when it's persisted via _set_worker_pid, and
in dispatch_once only waitpid those specific PIDs. Windows is still a
no-op (no zombies / no WNOHANG).

Clears test_source_gates_waitpid_loop.

* fix(api_server): propagate SSE batch flush failures to main streaming loop

When the batched-delta background task ("_batch_flush_after") hit a
ConnectionResetError on response.write(), the exception was swallowed
in the detached task and the main loop kept waiting on stream_q for
items that would never arrive — the streaming endpoint hung until the
client timed out and the agent was never interrupted.

Restore #398's fix: catch the flush exception in the background task,
stash it in _batch_error, and push a sentinel into stream_q so the main
loop re-raises it on dequeue. Both the live loop and the drain path
honour the sentinel.

Clears test_stream_batched_delta_disconnect_interrupts_agent.

* fix(browser_tool): honor restricted PATH for Homebrew/user-writable trust roots

A prior merge widened _SANE_PATH_DIRS to include /opt/homebrew/{bin,sbin}
and /usr/local/{bin,sbin} unconditionally and made _browser_candidate_path_dirs
always inject Homebrew node prefixes. Cron / systemd / locked-down operator
configs that intentionally strip those trust roots from PATH would silently
get them injected back, defeating the restriction.

Restore #234's design:
- _SANE_PATH_DIRS only includes Termux + system dirs (/usr/{bin,sbin}, /{bin,sbin}).
- _browser_candidate_path_dirs(existing_path) takes the operator-provided PATH
  and only adds Homebrew node prefixes / /usr/local / hermes-managed Node bin
  when the operator already opted into that trust root.
- _find_agent_browser passes os.environ.get("PATH","") through to
  _merge_browser_path so the gating actually fires (previously it passed "").

Clears all 4 test_browser_homebrew_paths failures.

---------

Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants