feat(api): structured run events via /v1/runs SSE endpoint#3100
Conversation
ReviewThanks for tackling this — structured lifecycle events via SSE is exactly what #2971 needs, and the event taxonomy ( There are several issues that need to be fixed before we can merge. Once addressed, please show the full test suite passing and include screenshots or a short video of the Critical — breaks existing functionality1. CLI callback not updated. def _on_tool_progress(self, function_name: str, preview: str, function_args: dict):After your change, 2. ACP adapter callback not updated. Both need to be updated to accept the new High — bugs in the new feature3. SSE keepalive disconnects the stream. try:
while True:
event = await asyncio.wait_for(q.get(), timeout=30.0)
...
except asyncio.TimeoutError:
await response.write(b": keepalive\n\n")
# falls through to finally → stream closesAfter 30s with no events (any tool that takes longer than 30s), the SSE connection drops and the client misses everything after. The timeout handling needs to be inside the loop: while True:
try:
event = await asyncio.wait_for(q.get(), timeout=30.0)
except asyncio.TimeoutError:
await response.write(b": keepalive\n\n")
continue
if event is None:
...
break
...4. Fire-and-forget task will get GC'd. 5. No orphaned run cleanup. Medium6. 7. Test suite. source .venv/bin/activate
python -m pytest tests/ -n0 -qWhat we need before re-review
Looking forward to the updated version. |
Adds a new /v1/runs API for app-embedded integrations that need
structured agent lifecycle events, not just final text.
## New endpoints
POST /v1/runs
Start an agent run asynchronously. Returns {run_id, status} immediately
with HTTP 202. Accepts: input, instructions, previous_response_id,
session_id (same fields as /v1/responses).
GET /v1/runs/{run_id}/events
SSE stream of structured JSON events for the run lifetime:
- tool.started {tool, preview}
- tool.completed {tool, duration, error}
- message.delta {delta} — streaming text chunks
- reasoning.available {text} — model reasoning (when available)
- run.completed {output, usage}
- run.failed {error}
## Implementation
- tool_progress_callback signature extended from (tool_name, preview, args)
to (event_type, tool_name, preview, args, **kwargs) across run_agent.py,
gateway/run.py, and tools/delegate_tool.py
- _create_agent() now accepts tool_progress_callback for wiring
- run streams stored in APIServerAdapter._run_streams (in-memory queue per run)
- thread-safe bridge: agent runs in executor thread, pushes events via
loop.call_soon_threadsafe into the asyncio queue read by the SSE handler
## Usage example
# Start the run
resp = requests.post('/v1/runs', json={'input': 'list files in /tmp'})
run_id = resp.json()['run_id']
# Stream events
for line in requests.get(f'/v1/runs/{run_id}/events', stream=True).iter_lines():
if line.startswith(b'data:'):
print(json.loads(line[5:]))
Closes NousResearch#2971
tool_progress_callback now takes (event_type, tool_name, preview, args) instead of (tool_name, preview, args).
- Fix cli.py _on_tool_progress to accept new (event_type, ...) signature - Fix acp_adapter/events.py _tool_progress same signature update - Fix SSE keepalive: move TimeoutError catch inside while loop (was dropping stream) - Fix fire-and-forget: use create_task + _background_tasks instead of ensure_future - Add orphaned run cleanup: _run_streams_created tracking + _sweep_orphaned_runs task - Use asyncio.get_running_loop() instead of deprecated get_event_loop() in handlers
61ad069 to
46cb974
Compare
|
Thanks for the thorough review. All issues addressed in the latest push (rebased on current main): Critical fixes
High fixes
Other
Tests Rebased onto current main (was 27 commits behind). The 4 pre-existing failures in The I'll add an end-to-end demo video of the |
|
Test suite results and end-to-end demo (follow-up to the previous comment) Test suiteFull run on rebased branch ( Compared against current The 2-test delta is entirely in End-to-end demoLive run against a local API server (claude-sonnet-4-6, tool: Events flow: |
|
Screenshot — Live run on claude-sonnet-4-6: |
|
Screenshot — Live run on claude-sonnet-4-6: |


Closes #2971
What this adds
Two new endpoints for app-embedded integrations that need structured agent lifecycle events:
POST /v1/runs— start a run asynchronously, get arun_idimmediately (HTTP 202)GET /v1/runs/{run_id}/events— SSE stream of typed JSON events:tool.startedtool,previewtool.completedtool,duration,errormessage.deltadeltareasoning.availabletextrun.completedoutput,usagerun.failederrorAll events also carry
run_idandtimestamp.Usage
Implementation notes
tool_progress_callbacksignature extended from(tool_name, preview, args)to(event_type, tool_name, preview, args, **kwargs)— updated inrun_agent.py,gateway/run.py,tools/delegate_tool.py. The gateway's existing progress relay (Telegram/Discord tool progress messages) only listens totool.started, behavior unchanged.asyncio.Queuestored inAPIServerAdapter._run_streams. The agent runs in a thread pool executor and pushes events vialoop.call_soon_threadsafe. The SSE handler drains the queue asynchronously.previous_response_idandsession_idare supported for stateful conversations, same as/v1/responses.Tests
84 existing tests pass. The updated
test_run_progress_topics.pyuses the new event-type-first signature.