Skip to content

feat(api): structured run events via /v1/runs SSE endpoint#3100

Closed
Mibayy wants to merge 5 commits into
NousResearch:mainfrom
Mibayy:feat/api-structured-events
Closed

feat(api): structured run events via /v1/runs SSE endpoint#3100
Mibayy wants to merge 5 commits into
NousResearch:mainfrom
Mibayy:feat/api-structured-events

Conversation

@Mibayy

@Mibayy Mibayy commented Mar 26, 2026

Copy link
Copy Markdown
Contributor

Closes #2971

What this adds

Two new endpoints for app-embedded integrations that need structured agent lifecycle events:

POST /v1/runs — start a run asynchronously, get a run_id immediately (HTTP 202)

GET /v1/runs/{run_id}/events — SSE stream of typed JSON events:

event fields
tool.started tool, preview
tool.completed tool, duration, error
message.delta delta
reasoning.available text
run.completed output, usage
run.failed error

All events also carry run_id and timestamp.

Usage

import requests, json

# Start the run
resp = requests.post('http://localhost:8642/v1/runs',
    headers={'Authorization': 'Bearer YOUR_KEY'},
    json={'input': 'list files in /tmp'})
run_id = resp.json()['run_id']

# Stream events
with requests.get(f'http://localhost:8642/v1/runs/{run_id}/events',
        headers={'Authorization': 'Bearer YOUR_KEY'}, stream=True) as r:
    for line in r.iter_lines():
        if line.startswith(b'data:'):
            event = json.loads(line[5:])
            print(event['event'], event)

Implementation notes

  • tool_progress_callback signature extended from (tool_name, preview, args) to (event_type, tool_name, preview, args, **kwargs) — updated in run_agent.py, gateway/run.py, tools/delegate_tool.py. The gateway's existing progress relay (Telegram/Discord tool progress messages) only listens to tool.started, behavior unchanged.
  • Each run gets an asyncio.Queue stored in APIServerAdapter._run_streams. The agent runs in a thread pool executor and pushes events via loop.call_soon_threadsafe. The SSE handler drains the queue asynchronously.
  • previous_response_id and session_id are supported for stateful conversations, same as /v1/responses.
  • 30s SSE keepalive comment sent on timeout, queue cleaned up on disconnect.

Tests

84 existing tests pass. The updated test_run_progress_topics.py uses the new event-type-first signature.

@teknium1

Copy link
Copy Markdown
Contributor

Review

Thanks for tackling this — structured lifecycle events via SSE is exactly what #2971 needs, and the event taxonomy (tool.started, tool.completed, message.delta, reasoning.available, run.completed/failed) is clean. The callback signature evolution to (event_type, ...) is the right direction.

There are several issues that need to be fixed before we can merge. Once addressed, please show the full test suite passing and include screenshots or a short video of the /v1/runs flow working end-to-end. We will re-review after.


Critical — breaks existing functionality

1. CLI callback not updated.
cli.py:4646 still has the old 3-param signature:

def _on_tool_progress(self, function_name: str, preview: str, function_args: dict):

After your change, run_agent.py calls callback("tool.started", name, preview, args) — so the CLI receives the event type as the tool name (argument shift). Worse, tool.completed passes **kwargs (duration, is_error) which the CLI callback does not accept → TypeError crash on every tool completion. This callback powers the spinner in every interactive CLI session.

2. ACP adapter callback not updated.
acp_adapter/events.py:64 has the old signature _tool_progress(name, preview, args). Same argument-shift problem — ACP clients (VS Code / Zed / JetBrains integration) would get garbled ToolCallStart events.

Both need to be updated to accept the new (event_type, tool_name, preview, args, **kwargs) signature and handle or ignore the new event types gracefully.


High — bugs in the new feature

3. SSE keepalive disconnects the stream.
The asyncio.TimeoutError catch is outside the while True loop:

try:
    while True:
        event = await asyncio.wait_for(q.get(), timeout=30.0)
        ...
except asyncio.TimeoutError:
    await response.write(b": keepalive\n\n")
    # falls through to finally → stream closes

After 30s with no events (any tool that takes longer than 30s), the SSE connection drops and the client misses everything after. The timeout handling needs to be inside the loop:

while True:
    try:
        event = await asyncio.wait_for(q.get(), timeout=30.0)
    except asyncio.TimeoutError:
        await response.write(b": keepalive\n\n")
        continue
    if event is None:
        ...
        break
    ...

4. Fire-and-forget task will get GC'd.
asyncio.ensure_future(_run_and_close()) does not store the task reference. Python 3.12+ aggressively GCs unreferenced tasks, which can silently kill the agent run mid-execution. We just merged #3267 fixing this exact pattern across 6 gateway adapters. Use asyncio.create_task() and store the task in self._background_tasks (available from the base class) with the standard add_done_callback(self._background_tasks.discard) cleanup pattern. See gateway/platforms/webhook.py:366-368 for the canonical example.

5. No orphaned run cleanup.
If a client POSTs /v1/runs but never GETs /v1/runs/{id}/events, the Queue and running task leak indefinitely. Add a TTL-based sweep (e.g. a periodic task that removes entries from _run_streams older than N minutes, or clean up in _run_and_close itself after a timeout if no consumer has connected).


Medium

6. reasoning.available guard removal.
Removing the _delegate_depth > 0 guard is fine for the /v1/runs endpoint, but combined with issue #1 above it would dump reasoning text into the CLI spinner as a fake tool call. Once the CLI callback is updated (issue #1), make sure it handles reasoning.available appropriately (probably just ignore it — CLI already has its own reasoning display via thinking_callback).

7. Test suite.
The PR says "84 existing tests pass" — current main has ~6200 tests. Please rebase onto current main and run the full suite:

source .venv/bin/activate
python -m pytest tests/ -n0 -q

What we need before re-review

  1. Fix all the above issues
  2. Full test suite passing (all ~6200 tests)
  3. Screenshots or a short video showing the /v1/runs flow working — POST to start a run, SSE stream showing tool events flowing in real time, and the run.completed event at the end

Looking forward to the updated version.

Hermes and others added 3 commits March 27, 2026 00:56
Adds a new /v1/runs API for app-embedded integrations that need
structured agent lifecycle events, not just final text.

## New endpoints

POST /v1/runs
  Start an agent run asynchronously. Returns {run_id, status} immediately
  with HTTP 202. Accepts: input, instructions, previous_response_id,
  session_id (same fields as /v1/responses).

GET /v1/runs/{run_id}/events
  SSE stream of structured JSON events for the run lifetime:
    - tool.started    {tool, preview}
    - tool.completed  {tool, duration, error}
    - message.delta   {delta}           — streaming text chunks
    - reasoning.available {text}        — model reasoning (when available)
    - run.completed   {output, usage}
    - run.failed      {error}

## Implementation

- tool_progress_callback signature extended from (tool_name, preview, args)
  to (event_type, tool_name, preview, args, **kwargs) across run_agent.py,
  gateway/run.py, and tools/delegate_tool.py
- _create_agent() now accepts tool_progress_callback for wiring
- run streams stored in APIServerAdapter._run_streams (in-memory queue per run)
- thread-safe bridge: agent runs in executor thread, pushes events via
  loop.call_soon_threadsafe into the asyncio queue read by the SSE handler

## Usage example

  # Start the run
  resp = requests.post('/v1/runs', json={'input': 'list files in /tmp'})
  run_id = resp.json()['run_id']

  # Stream events
  for line in requests.get(f'/v1/runs/{run_id}/events', stream=True).iter_lines():
      if line.startswith(b'data:'):
          print(json.loads(line[5:]))

Closes NousResearch#2971
tool_progress_callback now takes (event_type, tool_name, preview, args)
instead of (tool_name, preview, args).
- Fix cli.py _on_tool_progress to accept new (event_type, ...) signature
- Fix acp_adapter/events.py _tool_progress same signature update
- Fix SSE keepalive: move TimeoutError catch inside while loop (was dropping stream)
- Fix fire-and-forget: use create_task + _background_tasks instead of ensure_future
- Add orphaned run cleanup: _run_streams_created tracking + _sweep_orphaned_runs task
- Use asyncio.get_running_loop() instead of deprecated get_event_loop() in handlers
@Mibayy Mibayy force-pushed the feat/api-structured-events branch from 61ad069 to 46cb974 Compare March 27, 2026 01:04
@Mibayy

Mibayy commented Mar 27, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the thorough review. All issues addressed in the latest push (rebased on current main):

Critical fixes

  1. cli.py_on_tool_progress updated to (event_type, function_name=None, preview=None, function_args=None, **kwargs). Returns early on any event that is not tool.started, so tool.completed kwargs no longer cause a TypeError and the spinner behavior is unchanged.

  2. acp_adapter/events.py_tool_progress updated to (event_type, name=None, preview=None, args=None, **kwargs). Only emits ToolCallStart for tool.started; other event types are silently ignored.

High fixes

  1. SSE keepalive loop — moved asyncio.TimeoutError catch inside the while True loop with continue. The stream now sends a keepalive comment and keeps going instead of falling through to finally.

  2. Fire-and-forget task — replaced asyncio.ensure_future() with asyncio.create_task() and registered the task in self._background_tasks using the canonical add_done_callback(self._background_tasks.discard) pattern (same as webhook.py).

  3. Orphaned run cleanup — added self._run_streams_created: Dict[str, float] to track creation timestamps, a _sweep_orphaned_runs() coroutine that runs every 60s and removes entries older than 300s, and starts it as a tracked background task in connect(). Both SSE handler and _run_and_close clean up their entry in finally.

Other

  • Replaced deprecated asyncio.get_event_loop() with asyncio.get_running_loop() in the two handler methods.
  • reasoning.available in the CLI is handled correctly: the updated _on_tool_progress returns early on non-tool.started events, so no reasoning text leaks into the spinner.

Tests

Rebased onto current main (was 27 commits behind). The 4 pre-existing failures in tests/agent/test_auxiliary_client.py (vision fallback tests) are present on main too — unrelated to this PR. All tests directly touching our changes pass:

tests/agent/test_subagent_progress.py   22 passed
tests/gateway/test_run_progress_topics.py  1 passed

The tests/acp/test_server.py import error (AuthMethod not found in acp.schema) is the same pre-existing issue tracked in #3111.

I'll add an end-to-end demo video of the /v1/runs flow shortly.

@Mibayy

Mibayy commented Mar 27, 2026

Copy link
Copy Markdown
Contributor Author

Test suite results and end-to-end demo (follow-up to the previous comment)


Test suite

Full run on rebased branch (tests/acp excluded — pre-existing AuthMethod import error from #3111):

26 failed, 6101 passed, 178 skipped

Compared against current main on the same machine:

24 failed, 6103 passed, 178 skipped

The 2-test delta is entirely in tests/tools/test_ssh_environment.py (flaky SSH timing tests that appear/disappear between runs — confirmed by re-running the same test file in isolation: they pass). None of the failures are in files touched by this PR.


End-to-end demo

Live run against a local API server (claude-sonnet-4-6, tool: terminal):

=== POST /v1/runs ===
{
  "run_id": "run_03f59aa810854bdc91c83a886a0ab6f4",
  "status": "started"
}

=== GET /v1/runs/run_03f59aa810854bdc91c83a886a0ab6f4/events (SSE) ===
  [01:13:21] tool.started   tool=terminal  preview='ls /tmp'
  [01:13:25] message.delta  '\n\nBien'
  [01:13:26] message.delta  ' rempli. Beaucoup de fichiers de sessions passées...'
  [01:13:26] message.delta  ' — scripts Python, SQL, screenshots, artefacts Playwright'
  [01:13:26] message.delta  ', caches pip, fichiers de travail Improvence, logs'
  [01:13:27] message.delta  ' de bots... Le /tmp accumule tout ça depuis'
  [01:13:27] message.delta  ' un moment.\n\nQuelques trucs notables...'
  ...
  [01:13:31] reasoning      'Bien rempli. Beaucoup de fichiers de sessions passées — scripts Python...'
  [01:13:31] run.completed
             output: 'Bien rempli. Beaucoup de fichiers de sessions...'
             usage:  {'input_tokens': 38361, 'output_tokens': 275, 'total_tokens': 38636}

Events flow: tool.started fires before the tool executes, message.delta streams token-by-token as the model responds, reasoning.available carries the model's internal reasoning text, and run.completed delivers the final output + token usage. The keepalive (: keepalive) comment fires every 30s when there are no events, keeping the connection alive through long-running tools.

@Mibayy

Mibayy commented Mar 27, 2026

Copy link
Copy Markdown
Contributor Author

Screenshot — /v1/runs SSE flow end-to-end

/v1/runs SSE demo

Live run on claude-sonnet-4-6: POST /v1/runs returns a run_id immediately (HTTP 202), then GET /v1/runs/{run_id}/events streams tool.started as the terminal tool fires, message.delta token-by-token, reasoning.available with the model's thinking, and run.completed with the full output and token usage.

@Mibayy

Mibayy commented Mar 27, 2026

Copy link
Copy Markdown
Contributor Author

Screenshot — /v1/runs SSE flow end-to-end (replacing the previous broken link)

/v1/runs SSE demo

Live run on claude-sonnet-4-6: POST /v1/runs returns a run_id immediately (HTTP 202), then GET /v1/runs/{run_id}/events streams tool.started as the terminal tool fires, message.delta token-by-token, reasoning.available with the model's thinking, and run.completed with the full output and token usage.

@teknium1

teknium1 commented Apr 5, 2026

Copy link
Copy Markdown
Contributor

Merged via PR #5292. Your contribution (structured run events SSE) was cherry-picked onto current main with your authorship preserved in git log. Thank you @Mibayy!

@teknium1 teknium1 closed this Apr 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] API server: expose structured run events and resumable clarify/approval interactions

2 participants