Skip to content

fix(api-server): Implement background response run recovery#15492

Open
zhboner wants to merge 2 commits into
NousResearch:mainfrom
zhboner:fix/background-response-runs
Open

fix(api-server): Implement background response run recovery#15492
zhboner wants to merge 2 commits into
NousResearch:mainfrom
zhboner:fix/background-response-runs

Conversation

@zhboner

@zhboner zhboner commented Apr 25, 2026

Copy link
Copy Markdown

What does this PR do?

This PR implements true server-side background execution for streaming POST /v1/responses requests.

Previously, a stream=true Responses API request was tightly coupled to the SSE client connection. If the client disconnected, the agent execution could be interrupted and the response could not be reliably recovered. This PR decouples response execution from the SSE subscriber lifecycle so the server continues running the response in the background and allows clients to recover state through GET /v1/responses/{response_id}.

Current limitations

This PR does not attempt to make stream and store fully orthogonal across all /v1/responses modes.

The issue addressed here is specific to stream=true: durable streaming responses should continue running after SSE disconnect and should be recoverable by response ID. This PR therefore unifies the two streaming modes around ResponseRun:

  • stream=true + store=true: durable/background/recoverable
  • stream=true + store=false: ephemeral/connection-owned, cancelled on disconnect

The existing stream=false synchronous path is left unchanged.

Making all four combinations fully orthogonal would require moving non-streaming execution onto ResponseRun as well. That is a larger lifecycle refactor touching cancellation semantics, conversation active-state tracking, idempotency, error handling, tests, and client expectations. Since that is outside the scope of the reported background streaming issue, it is deferred to a future PR.

Related Issue

Fixes #15026

Type of Change

  • 🐛 Bug fix (non-breaking change that fixes an issue)
  • ✨ New feature (non-breaking change that adds functionality)
  • 🔒 Security fix
  • 📝 Documentation update
  • ✅ Tests (adding or improving test coverage)
  • ♻️ Refactor (no behavior change)
  • 🎯 New skill (bundled or hub)

Changes Made

  • Added ResponseRun / ResponseRunManager to own background /v1/responses execution lifecycle.
  • Changed streaming /v1/responses behavior so SSE client disconnects only detach the subscriber and do not cancel the underlying agent run.
  • Added active response recovery through GET /v1/responses/{response_id}.
  • Required store=true for stream=true Responses API requests.
    • stream=true + store=false now returns 400 store_required.
  • Added streaming Idempotency-Key support.
    • Matching retries can reattach to active runs or replay stored responses.
    • Conflicting retries return 409 idempotency_key_conflict.
  • Added bounded queues and concurrent active response run limits.
  • Added response.snapshot SSE event for active-run reattach/idempotent retry.
  • Added SSE keepalive comments for response streams.
  • Added CORS headers for stored streaming response replay.
  • Updated previous_response_id and conversation chaining so only completed responses are accepted.
  • Reworked conversation tracking to separate:
    • latest_completed_response_id
    • active_response_id
  • Added migration support from the legacy conversations.response_id schema.
  • Ensured DELETE and LRU eviction clear conversation references and idempotency mappings.
  • Added shutdown/delete cancellation handling for active background response runs.
  • Removed the legacy private _write_sse_responses(...) path in favor of ResponseRun.

How to Test

1. Run the targeted test suite

~/.hermes/hermes-agent/venv/bin/python -m pytest -o addopts= tests/gateway/test_api_server.py tests/gateway/test_sse_agent_cancel.py -q

Expected result:

140 passed

2. Verify stream=true + store=false works as ephemeral streaming

Run:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "Reply with exactly: ephemeral-ok",
    "stream": true,
    "store": false
  }'

Expected result:

  • HTTP 200
  • SSE stream includes response.created
  • SSE stream includes response.completed
  • stream ends with:
data: [DONE]

Copy the response.id from the stream, then run:

curl -i http://127.0.0.1:8642/v1/responses/<response_id> \
  -H "Authorization: Bearer <API_KEY>"

Expected result:

  • HTTP 404
  • because store=false responses are ephemeral and not recoverable

3. Verify stream=true + store=false rejects Idempotency-Key

Run:

curl -i http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: ephemeral-test-key" \
  -d '{
    "model": "hermes-agent",
    "input": "hello",
    "stream": true,
    "store": false
  }'

Expected result:

  • HTTP 400
  • response error code:
"idempotency_requires_store"

This confirms that idempotent streaming replay/reattach is only available for durable store=true streams.


4. Verify stream=true + store=true survives client disconnect

Start a durable streaming response:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "Take a little time, then reply with exactly: background-recovered-ok",
    "stream": true,
    "store": true
  }'

After receiving the first response.created event, stop the client with Ctrl+C.

Expected server behavior:

  • SSE subscriber disconnects
  • agent run continues in the background
  • run is not interrupted just because the client disconnected

Then recover the response:

curl http://127.0.0.1:8642/v1/responses/<response_id> \
  -H "Authorization: Bearer <API_KEY>"

Expected result:

  • while still running, response may show status: "in_progress"
  • after completion, response shows:
"status": "completed"

5. Verify durable streaming idempotency retry

Start a durable streaming request with an idempotency key:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: durable-stream-test-1" \
  -d '{
    "model": "hermes-agent",
    "input": "Reply with exactly: durable-idempotency-ok",
    "stream": true,
    "store": true
  }'

Retry the exact same request with the same Idempotency-Key:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: durable-stream-test-1" \
  -d '{
    "model": "hermes-agent",
    "input": "Reply with exactly: durable-idempotency-ok",
    "stream": true,
    "store": true
  }'

Expected result:

  • retry returns the same response.id
  • if the original run is still active, retry starts with:
event: response.snapshot
  • if the original run already completed, retry streams/replays the stored response

6. Verify idempotency conflict handling

Reuse the same idempotency key with a different request body:

curl -i http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: durable-stream-test-1" \
  -d '{
    "model": "hermes-agent",
    "input": "This is a different request body.",
    "stream": true,
    "store": true
  }'

Expected result:

  • HTTP 409
  • response error code:
"idempotency_key_conflict"

7. Verify conversation behavior for store=false streaming

First create a stored checkpoint in a conversation:

curl http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "Remember the codeword pineapple. Reply exactly: checkpoint-ok",
    "conversation": "pr-test-conversation",
    "store": true
  }'

Then send an ephemeral streaming response in the same conversation:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "This is a temporary streamed turn. Reply exactly: ephemeral-conversation-ok",
    "conversation": "pr-test-conversation",
    "stream": true,
    "store": false
  }'

Expected result:

  • request succeeds
  • it may read existing conversation history
  • it does not update latest_completed_response_id
  • it does not set active_response_id
  • it is not recoverable via GET /v1/responses/{response_id}

Then send another stored request in the same conversation:

curl http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "What codeword did I ask you to remember? Reply with only the codeword.",
    "conversation": "pr-test-conversation",
    "store": true
  }'

Expected result:

  • request succeeds
  • conversation continues from the last stored checkpoint, not from the ephemeral streamed turn

8. Verify active conversation protection

Start a durable background stream in a conversation:

curl -N http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "Take a little time before answering.",
    "conversation": "active-conversation-test",
    "stream": true,
    "store": true
  }'

While it is still running, send another request to the same conversation:

curl -i http://127.0.0.1:8642/v1/responses \
  -H "Authorization: Bearer <API_KEY>" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "hermes-agent",
    "input": "Follow-up while previous response is active.",
    "conversation": "active-conversation-test",
    "store": true
  }'

Expected result:

  • HTTP 409
  • response error code:
"conversation_response_not_completed"

This confirms that conversation ordering is protected while a durable response is active.

Checklist

Code

  • I've read the Contributing Guide
  • My commit messages follow Conventional Commits (fix(scope):, feat(scope):, etc.)
  • I searched for existing PRs to make sure this isn't a duplicate
  • My PR contains only changes related to this fix/feature (no unrelated commits)
  • I've run pytest tests/ -q and all tests pass
  • I've added tests for my changes (required for bug fixes, strongly encouraged for features)
  • I've tested on my platform: Ubuntu 26.04

Documentation & Housekeeping

  • I've updated relevant documentation (README, docs/, docstrings) — or N/A
  • I've updated cli-config.yaml.example if I added/changed config keys — or N/A
  • I've updated CONTRIBUTING.md or AGENTS.md if I changed architecture or workflows — or N/A
  • I've considered cross-platform impact (Windows, macOS) per the compatibility guide — or N/A
  • I've updated tool descriptions/schemas if I changed tool behavior — or N/A

@alt-glitch alt-glitch added type/bug Something isn't working P2 Medium — degraded but workaround exists comp/gateway Gateway runner, session dispatch, delivery labels Apr 25, 2026
@zhboner

zhboner commented Apr 29, 2026

Copy link
Copy Markdown
Author

Hi, just want to follow up on this PR. I know it is not a small one, so I totally understand if it's not easy to review quickly. I didn't break it down because all changes are made for one goal: the compelete background tasks running.

Following are changes that this PR made and are delibrated in the rest of this comment:

  1. ResponseStore schema/conversation changes
  2. ResponseRun lifecycle
  3. ResponseRunManager
  4. SSE writer / lifecycle policy
  5. /v1/responses handler changes
  6. GET / DELETE recovery and cleanup
  7. Tests

1. ResponseStore: durable response state + safer conversation pointers

ResponseStore already stored completed responses for previous_response_id chaining. This PR extends its conversation bookkeeping.

Previously, a conversation mapped to a single response_id.

That was ambiguous because the mapped response could be:

  • completed
  • active/in-progress
  • failed
  • incomplete
  • deleted/evicted

This PR changes the conversation schema to separate:

latest_completed_response_id
active_response_id

Design intent:

  • latest_completed_response_id is the checkpoint used for future chaining.
  • active_response_id is only a guard that prevents concurrent writes to the same named conversation.
  • failed/incomplete responses should not become the latest conversation checkpoint.
  • deleting or evicting a response should remove any conversation references to it.

Important methods:

  • _migrate_conversations_schema()
    • migrates old conversations(name, response_id TEXT NOT NULL) tables
    • rebuilds the table if legacy response_id exists
    • this is important because SQLite cannot simply drop the old NOT NULL column
  • get_conversation_latest_completed(...)
  • get_conversation_active(...)
  • set_conversation_completed(...)
  • set_conversation_active(...)
  • clear_conversation_active(...)
  • clear_conversation_references(...)
  • delete(...)
  • LRU eviction path in put(...)

2. ResponseRun: the core execution abstraction

ResponseRun is the main new abstraction.

It represents one /v1/responses streaming execution and owns:

  • response id / model / created_at
  • agent task
  • streaming callbacks
  • emitted output items
  • current response snapshot
  • final status
  • persistence when store=True
  • conversation active/completed updates when applicable
  • subscriber queues
  • cancellation / interruption

The important design point is:

ResponseRun is independent of any single SSE client connection.

So for background runs, if an SSE client disconnects, the ResponseRun can continue.

Important methods:

start()

Starts the background asyncio task for the run.

subscribe(...)

Creates a per-subscriber queue and yields SSE events.

This is where a new subscriber can receive:

  • response.created
  • response.snapshot
  • response.completed
  • response.failed
  • response.incomplete
  • keepalive comments via __keepalive__

Design intent:

  • active reattach uses response.snapshot, not fake replay
  • completed/failed/incomplete runs emit final event immediately
  • subscribers are removed cleanly in finally

_run()

Main lifecycle:

  1. mark status in_progress
  2. optionally persist an initial snapshot
  3. optionally mark conversation active
  4. broadcast response.created
  5. run the agent task
  6. drain tool/text callback events
  7. finalize as completed/failed/incomplete
  8. notify subscribers
  9. unregister from manager

_callbacks()

Converts agent callbacks into internal run events:

  • text deltas
  • tool starts
  • tool completions

These are queued into the event loop with _put_event_threadsafe(...).

_dispatch_event(...), _emit_text_delta(...), _emit_tool_started(...), _emit_tool_completed(...)

These convert internal callback events into Responses API-style SSE events.

_finalize_completed(...)

Builds final response object, persists it when store=True, and updates conversation checkpoint.

Key rule:

Only completed responses call set_conversation_completed(...)

_finalize_failed(...) and _finalize_incomplete(...)

Build failed/incomplete response objects and clear active conversation state, but do not advance latest_completed_response_id.

cancel(...)

Used by DELETE, shutdown, and ephemeral client disconnect.

Important detail:

  • waits briefly for agent_ref to become available before cancelling the task
  • this avoids a race where cancellation arrives before the agent object has been installed

3. ResponseRunManager: active durable run registry

ResponseRunManager tracks only active durable/recoverable runs.

It provides:

  • register(run)
  • unregister(response_id)
  • get(response_id)
  • can_start()
  • shutdown()

Design intent:

  • store=True streaming runs are registered
  • store=False ephemeral runs are not registered
  • GET can recover active durable runs
  • shutdown can cancel all active durable runs
  • concurrency limit applies to durable background runs

4. Unified SSE writer: _stream_response_run(...)

_stream_response_run(..., cancel_on_disconnect: bool)

The lifecycle policy is controlled by cancel_on_disconnect.

Durable background streaming

cancel_on_disconnect=False

Used for:

stream=true, store=true

Behavior:

  • client disconnect only detaches that subscriber
  • ResponseRun continues
  • response can be recovered via GET

Ephemeral foreground streaming

cancel_on_disconnect=True

Used for:

stream=true, store=false

Behavior:

  • same ResponseRun event generation
  • same SSE formatting
  • same keepalive handling
  • but client disconnect cancels/interupts the run
  • no persistence because ResponseRun(store=False)
  • no recovery because it is not registered in ResponseRunManager

This was intentionally refactored this way to avoid two duplicated streaming implementations.

5. Streaming idempotency: _ResponseRunIdempotencyIndex

Streaming idempotency is separate from the existing non-streaming idempotency cache.

The new index maps:

Idempotency-Key + request fingerprint -> response_id

Used only for:

stream=true, store=true

Design intent:

  • if same key/body is retried while run is active, attach to active run
  • if same key/body is retried after completion, replay stored final response as SSE
  • if same key/different body is retried, return conflict
  • if response is deleted, remove idempotency mapping

Important methods:

  • lookup(...)
  • put(...)
  • remove_response(...)

6. /v1/responses handler flow

The main request handler now does roughly this:

Step A: parse request and resolve conversation

If conversation is provided:

  1. check active_response_id
  2. reject if there is an active durable response
  3. resolve latest_completed_response_id
  4. use that as previous_response_id

Design intent:

  • conversations cannot be concurrently advanced by multiple durable runs
  • failed/incomplete responses do not poison future history
  • stale deleted/evicted references are cleared

Step B: build conversation_history

Uses precedence:

explicit conversation_history > previous_response_id/conversation latest completed

For previous responses:

  • only completed is valid
  • legacy stored responses without status are treated as completed for compatibility

Step C: branch by stream

Non-streaming

Existing behavior mostly remains:

  • run agent synchronously
  • return JSON response
  • persist if store=True
  • update conversation completed if stored

Streaming + store=True

Creates a durable ResponseRun:

run = ResponseRun(..., store=True, conversation=conversation)
manager.register(run)
run.start()
return _stream_response_run(..., cancel_on_disconnect=False)

Also handles streaming idempotency before creating a new run.

Streaming + store=False

Creates an ephemeral ResponseRun:

run = ResponseRun(..., store=False, conversation=None)
run.start()
return _stream_response_run(..., cancel_on_disconnect=True)

Important detail:

  • conversation_history may still be read from the named conversation before this
  • but conversation=None is passed into ResponseRun
  • therefore ephemeral streams cannot mutate conversation state

7. GET / DELETE behavior

GET /v1/responses/{response_id}

Lookup order:

  1. ResponseStore
  2. active ResponseRunManager
  3. 404

Design intent:

  • completed durable responses come from store
  • active durable responses return live snapshot
  • ephemeral responses return 404 because they are not stored or registered

DELETE /v1/responses/{response_id}

If response is active:

  1. cancel run with delete=True
  2. wait for completion
  3. delete store entry
  4. remove idempotency mapping

If response is stored:

  1. delete store entry
  2. remove idempotency mapping

8. Shutdown behavior

APIServerAdapter.disconnect() now shuts down active response runs.

Design intent:

  • server shutdown should not leave durable background tasks running
  • active runs are interrupted/cancelled cleanly

9. Test map

The tests in tests/gateway/test_api_server.py are intended to cover the main contracts.

Useful tests to inspect:

Background durable streaming

  • client disconnect does not cancel durable run
  • active GET snapshot works
  • DELETE active run does not re-persist incomplete state
  • shutdown cancels active response runs

Ephemeral streaming

  • stream=true, store=false succeeds
  • ephemeral response is not stored / GET returns 404
  • ephemeral disconnect interrupts agent
  • ephemeral stream with Idempotency-Key returns idempotency_requires_store

Idempotency

  • same key/body reuses active run
  • same key/body replays stored response
  • same key/different body returns conflict
  • delete removes mapping

Conversation safety

  • non-completed previous_response_id rejected
  • non-completed conversation checkpoint rejected
  • active conversation blocks concurrent request
  • failed durable stream does not poison latest completed checkpoint
  • store=false conversation stream reads history but does not update checkpoint/active marker
  • delete and LRU eviction clear conversation references

Migration

  • legacy conversations.response_id TEXT NOT NULL schema is rebuilt
  • legacy mapping is preserved as latest completed
  • new active writes no longer fail with NOT NULL constraint

SSE details

  • active reattach emits response.snapshot
  • initial stream still emits normal events
  • stored replay includes CORS
  • keepalive is emitted
  • cancellation race waits briefly for agent_ref

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

comp/gateway Gateway runner, session dispatch, delivery P2 Medium — degraded but workaround exists type/bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Streaming /v1/responses cannot be recovered after client disconnect because partial/in-progress responses are not persisted

2 participants