Event-driven architecture: rebase onto main + green the suite (continues #115)#273
Conversation
Bring the event-driven architecture branch up to date with main (98 commits) and reconcile the rewrite with features that landed after it forked: eager inbox delivery (awslabs#251), the OpenCode poller, env-var forwarding (awslabs#259), memory curation (awslabs#254/awslabs#262), CORS auto-derive (awslabs#261), DNS host validation (awslabs#124), and the self-send guard (awslabs#24). Highlights: - Providers adopt the async initialize() + get_status(buffer) contract; copilot_cli/opencode_cli converted; kiro keeps colour-only ANSI stripping so carriage-return-redraw permission prompts aren't misread as idle. - Event-driven InboxService.deliver_pending with the awslabs#251 eager gate and message-sender attribution; OpenCode poller retained as a status-driven method; the watchdog (PollingObserver/LogFileHandler) is removed. - terminal_service.create_terminal is async (FIFO + StatusMonitor wiring); session_service.create_session, flow_service.execute_flow, the API endpoints, and `cao flow run` updated to await. - memory_service curated path and the flow CLI fixed to the new contract. Full unit suite green (1908 passed); black + isort clean.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #273 +/- ##
=======================================
Coverage ? 89.49%
=======================================
Files ? 84
Lines ? 8425
Branches ? 0
=======================================
Hits ? 7540
Misses ? 885
Partials ? 0
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR rebases and completes the event-driven architecture migration by replacing watchdog/log-file polling with an in-process EventBus pipeline that streams tmux output through FIFOs, derives terminal status from a rolling buffer, and triggers inbox delivery off status-change events. It also converts provider initialization and terminal/session creation flows to async and updates the unit/integration tests accordingly.
Changes:
- Introduces
EventBus+FifoManager+StatusMonitor+LogWriter+InboxServicepipeline, and removes watchdog-based log watching (and related deps). - Converts terminal/session/flow execution and provider
initialize()to async; updates status detection to consume a buffer string instead of querying tmux directly. - Updates API/CLI wiring and large portions of the test suite to match the new async + buffer-driven contracts.
Reviewed changes
Copilot reviewed 57 out of 58 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Removes watchdog/aiofiles from the locked dependency set. |
| pyproject.toml | Drops watchdog/aiofiles runtime dependencies. |
| CODEBASE.md | Updates architecture diagrams/flow docs for FIFO + event bus pipeline. |
| docs/api.md | Updates inbox delivery description to event-driven status detection. |
| docs/event-driven-architecture.md | Adds detailed documentation for the new event-driven pipeline. |
| src/cli_agent_orchestrator/api/main.py | Lifespan now starts event-bus consumers and awaits async services. |
| src/cli_agent_orchestrator/cli/commands/flow.py | Drives async execute_flow() from sync Click command via asyncio.run(). |
| src/cli_agent_orchestrator/constants.py | Adds FIFO/event-bus/state-buffer constants; repurposes polling interval. |
| src/cli_agent_orchestrator/models/terminal.py | Adds TerminalStatus.UNKNOWN. |
| src/cli_agent_orchestrator/providers/base.py | Makes initialize() async and changes get_status() signature to get_status(buffer). |
| src/cli_agent_orchestrator/providers/claude_code.py | Async init; status detection consumes buffer instead of tmux reads. |
| src/cli_agent_orchestrator/providers/codex.py | Async init and buffer-based status detection; async trust prompt handling. |
| src/cli_agent_orchestrator/providers/copilot_cli.py | Async init; status detection consumes buffer string. |
| src/cli_agent_orchestrator/providers/gemini_cli.py | Async init and buffer-based status detection; keeps warmup logic. |
| src/cli_agent_orchestrator/providers/kimi_cli.py | Async init and buffer-based status detection. |
| src/cli_agent_orchestrator/providers/kiro_cli.py | Async init; buffer-based status detection; improved escape stripping for extraction. |
| src/cli_agent_orchestrator/providers/opencode_cli.py | Async init; buffer-based status detection; ERROR fallback becomes UNKNOWN. |
| src/cli_agent_orchestrator/providers/q_cli.py | Async init; buffer-based status detection; removes log-idle helper. |
| src/cli_agent_orchestrator/services/cleanup_service.py | Stops FIFO readers and clears status buffers during retention cleanup. |
| src/cli_agent_orchestrator/services/event_bus.py | Adds thread-safe in-process pub/sub with wildcard subscriptions. |
| src/cli_agent_orchestrator/services/fifo_reader.py | Adds per-terminal FIFO reader threads publishing output events. |
| src/cli_agent_orchestrator/services/flow_service.py | Makes execute_flow() async; busy-check now uses status_monitor. |
| src/cli_agent_orchestrator/services/inbox_service.py | Replaces watchdog delivery with status-event-driven delivery logic. |
| src/cli_agent_orchestrator/services/log_writer.py | Persists streamed output chunks to per-terminal logs. |
| src/cli_agent_orchestrator/services/memory_service.py | Curator liveness checks now use status_monitor. |
| src/cli_agent_orchestrator/services/session_service.py | Makes create_session() async; delete path delegates terminal teardown. |
| src/cli_agent_orchestrator/services/status_monitor.py | Adds rolling buffer + provider-driven status derivation + status events. |
| src/cli_agent_orchestrator/services/terminal_service.py | Makes create_terminal() async; wires FIFO + pipe-pane; status/output sources updated. |
| src/cli_agent_orchestrator/utils/event.py | Adds helper to extract terminal id from topic names. |
| src/cli_agent_orchestrator/utils/terminal.py | Makes wait_for_shell/wait_until_status async and status-monitor-based; switches polling client to requests. |
| src/cli_agent_orchestrator/utils/text.py | Adds robust terminal escape/control stripping utility. |
| test/api/test_api_endpoints.py | Updates async service mocks for endpoints; updates lifespan expectations. |
| test/api/test_flow_api.py | Updates flow API tests for awaited async execution. |
| test/api/test_plugin_lifespan.py | Updates lifespan test stubs for new consumer tasks and OpenCode poller. |
| test/api/test_terminals.py | Updates endpoint tests for async service methods and new inbox delivery method. |
| test/cli/commands/test_flow.py | Updates CLI flow tests for async execute_flow() mocking. |
| test/providers/conftest.py | Adds shared integration fixtures bootstrapping event pipeline + DB mocks. |
| test/providers/test_base_provider.py | Updates base provider tests for async init + buffer-based status contract. |
| test/providers/test_copilot_cli_unit.py | Updates status tests for buffer contract; adjusts init tests. |
| test/providers/test_kimi_cli_unit.py | Updates init/status tests for async + buffer contract. |
| test/providers/test_kiro_cli_integration.py | Refactors integration tests to use real create_terminal FIFO pipeline. |
| test/providers/test_opencode_cli_unit.py | Updates fixtures/tests for buffer contract and async init. |
| test/providers/test_permission_prompt_detection.py | Refactors Kiro permission prompt tests to pass fixture output into get_status(output). |
| test/services/test_cleanup_service.py | Updates cleanup tests for new FIFO/status monitor interactions. |
| test/services/test_context_manager_agent.py | Updates memory curator tests to use status_monitor status gating. |
| test/services/test_flow_service.py | Updates flow service tests for async execute path and status_monitor busy check. |
| test/services/test_plugin_event_emission.py | Updates event emission tests for async create_* and inbox delivery refactor. |
| test/services/test_session_service.py | Updates session service tests for async create_session and new teardown delegation. |
| test/services/test_terminal_service_coverage.py | Updates terminal service cleanup-path coverage for async init + FIFO/status cleanup. |
| test/utils/test_terminal.py | Updates terminal util tests for async waiters + requests patching. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Start event bus consumers as background tasks | ||
| status_monitor_task = asyncio.create_task(status_monitor.run()) | ||
| log_writer_task = asyncio.create_task(log_writer.run()) | ||
| inbox_service_task = asyncio.create_task(inbox_service.run()) | ||
| logger.info("Event bus consumers started (StatusMonitor, LogWriter, InboxService)") |
| ### Status Monitor (`services/status_monitor.py`) — Publisher + Consumer | ||
|
|
||
| Subscribes to `terminal.*.output`. Accumulates output into a rolling buffer (8KB) per terminal, detects status via the registered provider (or a generic shell prompt pattern before init), and publishes `terminal.{id}.status` on change. Also the source of truth for current terminal status. | ||
|
|
|
@tuankn22 @tuanknguyen @patricka3125 @anilkmr-a2z can you please help to review this PR ? @anilkmr-a2z i saw you have another PR #271 , are there any duplication ? |
Resolve the codex test conflict introduced by awslabs#274 (codex v0.136 TUI footer + MCP tool-call marker filtering). awslabs#274 added its get_status regressions against the old tmux-reading contract (mock_tmux.get_history + get_status()); adapt them to the event-driven get_status(buffer) contract this branch introduces. codex.py auto-merged cleanly — the awslabs#274 regex/extraction fixes layer onto buffer-based get_status unchanged. Full unit suite green: 1948 passed, 1 skipped.
Resolve four review comments on PR awslabs#273: - InboxService.run() ran deliver_pending() synchronously inside the asyncio consumer loop, blocking it on DB + tmux I/O and starving the StatusMonitor/LogWriter consumers. Offload delivery via asyncio.to_thread, matching the threading discipline documented for the event bus and the existing OpenCode poller. - Event-driven deliveries were started without a PluginRegistry, so they skipped PostSendMessageEvent hooks. Thread the registry through run() -> deliver_pending so status-driven deliveries get the same plugin attribution as the immediate and OpenCode-poller paths. - FifoManager.stop_reader() early-returned when no in-memory reader was tracked, never unlinking the FIFO. Retention cleanup after a restart (empty _readers) would leak *.fifo files unbounded. Always best-effort unlink the FIFO; only log when a reader was actually stopped. - Docs claimed StatusMonitor falls back to a generic shell-prompt pattern before init; _detect_status returns UNKNOWN until a provider registers. Correct the docs to match. Adds regression tests for the registry threading, the to_thread offload, and the stale-FIFO unlink.
Reconcile the event-driven inbox rewrite with awslabs#265 and awslabs#266, which just landed on main and edit the same files: - inbox_service.py: keep the event-driven InboxService, but apply awslabs#265's ordering — mark messages DELIVERED before send_input so output echoed back through the FIFO/StatusMonitor pipeline can't re-enter deliver_pending and double-deliver a still-PENDING message (awslabs#164). Add awslabs#266's reconcile_orphaned_messages as a method that routes stale PENDING receivers back through deliver_pending (awslabs#131). - api/main.py: keep the event-bus consumers + OpenCode poller; add awslabs#266's inbox_reconciliation_daemon task (start + cancel). Drop awslabs#266's watchdog PollingObserver wiring — the event bus replaced it here. - constants.py: keep both INBOX_POLLING_INTERVAL and awslabs#266's INBOX_RECONCILE_INTERVAL/GRACE; reword the comments off the watchdog. - tests: adapt awslabs#265's ordering regression and awslabs#266's reconcile + lifespan tests to the event-driven class API (deliver_pending, status_monitor, the singleton's bound method); drop the watchdog LogFileHandler tests. Full unit suite green: 1958 passed, 1 skipped.
|
@haofeif That PR actually doesn't change tmux integ but add support for Herdr which provide these notification already. But yeah, I think it would be good to change tmux to event driven too, if we can. I have noticed the machine gets slow down as hell when there are lot of terminals. So, trying to find solution to make it efficient. |
…r ' no duration) The newest TUI sometimes writes the completion summary's duration via a separate cursor-positioned write that the raw stream splits off, leaving '✻ Crunched for ' with no 'Ns'. The strict COMPLETION_SUMMARY_PATTERN (requires 'for Ns') then failed to recognize completion, so the spinner-before-separator walk reported PROCESSING off a stale spinner and a finished handoff intermittently stuck at PROCESSING/IDLE until timeout. Add GET_STATUS_COMPLETION_PATTERN (glyph + 'for', no ellipsis, no digits required) for get_status detection + the boxless-tail guard; extraction keeps the strict pattern. Verified: six real handoff captures all settle COMPLETED; live-processing and compaction still PROCESSING.
@haofeif — thank you for the exceptionally detailed review. Pushed a fix (commits Root cause confirmedYou were exactly right: What changed
Verified ✅
Needs your check
|
…awslabs#273) Reconcile the two concurrent refactors of the same providers/services: PR awslabs#271 added a terminal backend abstraction (tmux/herdr) with polling get_status; this branch (awslabs#273) replaced polling with an event-driven FIFO -> EventBus -> StatusMonitor/LogWriter/InboxService pipeline and async, buffer-based provider status detection. Resolution keeps the event-driven design as primary and layers in the backend abstraction: - Route terminal I/O through get_backend() (TmuxBackend delegates to tmux_client, so the default path is behaviorally unchanged) while keeping the FIFO pipe-pane stream as the tmux event source. - Guard FIFO setup in create_terminal with `not get_backend().supports_event_inbox()`; herdr uses its socket-event inbox. Keep herdr inbox registration additively (no-op for tmux). - Providers keep async initialize() and buffer-based get_status(output); claude_code keeps its newest-TUI parser plus herdr's native-status short-circuit (the get_history poll inside get_status is removed). - utils.terminal wait_for_shell/wait_until_status stay async + terminal_id based (StatusMonitor-driven). - API lifespan starts the EventBus consumers unconditionally and the herdr inbox service only for HerdrBackend; the removed watchdog PollingObserver branch is dropped. - inbox_service.deliver_pending keeps the event-driven readiness gate and resets herdr TerminalNotFoundError deliveries to PENDING for retry. - get_output(FULL) returns the StatusMonitor buffer; get_output(LAST) keeps awslabs#271's escalating fetch via get_backend(). Also: add submit_delay to the backend send_keys interface (Claude paste timing); update awslabs#271 net-new tests for async create_terminal / the rewired lifespan / the send_keys signature; fix pre-existing herdr mypy issues. Full unit suite green (2198 passed, 17 skipped); mypy, black and isort clean.
The newest Kimi CLI (the "Kimi Code" rebuild) replaced the ✨/💫 emoji
prompt with a boxed input area ("── input ──"), an "agent (<model> ●)"
status bar, a "context: N%" usage footer, and a braille-spinner working
indicator ("⠧ Thinking… Ns · N tokens"). The legacy detector keyed on the
emoji prompt, so it never observed IDLE and every Kimi terminal timed out
at init — this was the failure flagged on awslabs#273 (now confirmed to be a
Kimi-side TUI redesign, not a regression from the event-driven refactor).
get_status now detects the new TUI (gated on the new status/footer markers
so legacy emoji builds are unchanged):
- READY = status bar / context footer present with no live spinner.
- PROCESSING vs READY is decided by POSITION (last braille spinner line vs
last ready-chrome line), so stale spinner frames lingering in the 8KB
rolling buffer don't pin a finished turn at PROCESSING.
- COMPLETED vs IDLE latches on a "•" bullet (a turn produced output); the
welcome banner / update nag have none, so a fresh terminal reads IDLE and
avoids a premature-completion race when the first task is sent.
Adds regression tests driven by real captured raw pipe-pane buffers
(idle / processing / completed) of the new TUI.
Also enables Claude <-> Kimi cross-provider e2e:
- examples/cross-provider/data_analyst_kimi_cli.md (provider: kimi_cli).
- TestCrossProviderClaudeToKimi / TestCrossProviderKimiToClaude.
- Fix the cross-provider test helper to resolve the worker's provider from
its profile (the add-terminal endpoint treats an explicit provider param
as authoritative, which silently suppressed the profile override).
Verified on real CLIs (kimi 1.47.0 + claude 2.1.x) against a cao-server
built from this branch:
- TestKimiCliHandoff (2) + TestKimiCliAssign (3): pass.
- Claude->Kimi and Kimi->Claude cross-provider assign: pass.
Full unit suite green (2202 passed); mypy / black / isort clean.
| async def wait_for_shell( | ||
| terminal_id: str, | ||
| timeout: float = 10.0, | ||
| polling_interval: float = 0.5, | ||
| stable_duration: float = 2.0, | ||
| polling_interval: float = 0.3, | ||
| ) -> bool: | ||
| """Wait for shell to be ready by checking if output is stable (2 consecutive reads are the same and non-empty).""" | ||
| logger.info(f"Waiting for shell to be ready in {session_name}:{window_name}...") | ||
| start_time = time.time() | ||
| previous_output = None | ||
| """Wait for shell to be ready by checking if the output buffer is stable and non-empty. | ||
|
|
||
| while time.time() - start_time < timeout: | ||
| output = backend.get_history(session_name, window_name) | ||
| Reads the StatusMonitor's in-memory buffer (populated by the FIFO reader | ||
| → event bus → StatusMonitor pipeline). Returns True when the buffer is | ||
| non-empty and has not changed for *stable_duration* seconds. | ||
|
|
||
| This does NOT use provider-specific status detection because the provider | ||
| is already registered before initialize() runs, and provider patterns | ||
| don't match raw shell output. | ||
| """ | ||
| from cli_agent_orchestrator.services.status_monitor import status_monitor | ||
|
|
||
| logger.info(f"Waiting for shell to be ready for terminal {terminal_id}...") | ||
|
|
||
| deadline = time.time() + timeout |
| async def wait_until_status( | ||
| terminal_id: str, | ||
| target_status: "TerminalStatus | set[TerminalStatus]", | ||
| timeout: float = 30.0, | ||
| polling_interval: float = 1.0, | ||
| ) -> bool: | ||
| """Wait until provider reaches target status or timeout.""" | ||
| targets = target_status if isinstance(target_status, set) else {target_status} | ||
| start_time = time.time() | ||
| """Wait until terminal reaches target status by polling status_monitor.""" | ||
| from cli_agent_orchestrator.services.status_monitor import status_monitor | ||
|
|
||
| while time.time() - start_time < timeout: | ||
| status = provider_instance.get_status() | ||
| target_str = ", ".join(s.value for s in targets) | ||
| logger.info(f"Waiting for {{{target_str}}}, current status: {status}") | ||
| if status in targets: | ||
| targets = target_status if isinstance(target_status, set) else {target_status} | ||
| target_str = ", ".join(s.value for s in targets) | ||
| logger.info( | ||
| f"wait_until_status [{terminal_id}]: waiting for {{{target_str}}}, timeout={timeout}s" | ||
| ) | ||
| start = time.time() | ||
| while time.time() - start < timeout: | ||
| current = status_monitor.get_status(terminal_id) | ||
| if current in targets: | ||
| logger.info(f"wait_until_status [{terminal_id}]: reached {current.value}") | ||
| return True | ||
| time.sleep(polling_interval) | ||
|
|
||
| await asyncio.sleep(polling_interval) | ||
| logger.warning(f"wait_until_status [{terminal_id}]: timeout waiting for {{{target_str}}}") | ||
| return False |
| old_terminals = ( | ||
| db.query(TerminalModel).filter(TerminalModel.last_active < cutoff_date).all() | ||
| ) | ||
| for terminal in old_terminals: | ||
| fifo_manager.stop_reader(terminal.id) | ||
| status_monitor.clear_terminal(terminal.id) |
| def set_loop(self, loop: asyncio.AbstractEventLoop) -> None: | ||
| """Register the asyncio event loop (required for thread-safe publishing).""" | ||
| self._loop = loop | ||
|
|
||
| def publish(self, topic: str, data: dict) -> None: | ||
| """Publish event to all matching subscribers. Safe to call from any thread.""" | ||
| if self._loop: | ||
| self._loop.call_soon_threadsafe(self._dispatch, topic, data) | ||
|
|
| import logging | ||
| from typing import Dict | ||
|
|
||
| from cli_agent_orchestrator.constants import STATE_BUFFER_MAX | ||
| from cli_agent_orchestrator.models.terminal import TerminalStatus | ||
| from cli_agent_orchestrator.providers.manager import provider_manager | ||
| from cli_agent_orchestrator.services.event_bus import bus | ||
| from cli_agent_orchestrator.utils.event import terminal_id_from_topic | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class StatusMonitor: | ||
| """Accumulates terminal output into rolling buffers and detects status changes.""" | ||
|
|
||
| def __init__(self): | ||
| self._buffers: Dict[str, str] = {} | ||
| self._last_status: Dict[str, TerminalStatus] = {} | ||
|
|
| def _process_chunk(self, terminal_id: str, chunk: str) -> None: | ||
| """Append chunk to rolling buffer and check for status changes.""" | ||
| if terminal_id not in self._buffers: | ||
| self._buffers[terminal_id] = "" | ||
| self._buffers[terminal_id] += chunk | ||
|
|
||
| if len(self._buffers[terminal_id]) > STATE_BUFFER_MAX: | ||
| self._buffers[terminal_id] = self._buffers[terminal_id][-STATE_BUFFER_MAX:] | ||
|
|
||
| new_status = self._detect_status(terminal_id, self._buffers[terminal_id]) | ||
|
|
||
| if new_status != self._last_status.get(terminal_id): | ||
| bus.publish(f"terminal.{terminal_id}.status", {"status": new_status.value}) | ||
| logger.info(f"Terminal {terminal_id} status changed: {new_status.value}") | ||
| self._last_status[terminal_id] = new_status | ||
|
|
| def clear_terminal(self, terminal_id: str) -> None: | ||
| """Free buffer and status for a deleted terminal.""" | ||
| self._buffers.pop(terminal_id, None) | ||
| self._last_status.pop(terminal_id, None) | ||
|
|
||
| def reset_buffer(self, terminal_id: str) -> None: | ||
| """Clear the rolling buffer + last-known status WITHOUT forgetting the | ||
| terminal. | ||
|
|
||
| Used when a provider relaunches a different CLI mode on the SAME | ||
| ``terminal_id`` (e.g. Kiro's TUI -> ``--legacy-ui`` fallback). Without | ||
| this, the retry re-derives status from a buffer still full of stale bytes | ||
| from the failed first attempt and can spuriously time out. | ||
| """ | ||
| self._buffers[terminal_id] = "" | ||
| self._last_status.pop(terminal_id, None) | ||
|
|
||
| def get_status(self, terminal_id: str) -> TerminalStatus: | ||
| """Get current terminal status. Source of truth — derived from streaming output.""" | ||
| return self._last_status.get(terminal_id, TerminalStatus.UNKNOWN) | ||
|
|
||
| def get_buffer(self, terminal_id: str) -> str: | ||
| """Get accumulated output buffer for a terminal.""" | ||
| return self._buffers.get(terminal_id, "") |
| @staticmethod | ||
| def _write(path, data: str) -> None: | ||
| with open(path, "a") as f: | ||
| f.write(data) |
…n herdr
The event-driven rewrite made the FIFO -> EventBus -> StatusMonitor pipeline the
sole source of shell-readiness and terminal status. The herdr backend deliberately
skips that pipeline (its pipe_pane is a no-op; no FIFO reader is started for it,
since it delivers via socket events), so for a herdr terminal the StatusMonitor
buffer stayed "" and status stayed UNKNOWN forever. As a result provider.initialize()
timed out in wait_for_shell()/wait_until_status(), and every status read (API
GET /terminals/{id}, busy checks, provider init loops) returned UNKNOWN -- terminal
creation was broken for essentially every provider on herdr.
Fix at the single chokepoint instead of per-call-site:
- status_monitor.get_status() is now backend-aware: for event-inbox backends it
derives status on demand from the provider's get_status() (which consults
backend.get_native_status()). This fixes all read sites at once -- API status,
wait_until_status(), flow busy checks, curator liveness, and the copilot/gemini
init loops -- without each having to special-case the backend.
- wait_for_shell() reads backend.get_history() directly for event-inbox backends
rather than the never-populated StatusMonitor buffer.
- wait_until_status() reverts to simply polling the now backend-aware get_status(),
removing the duplicated backend logic.
The tmux path is unchanged (supports_event_inbox() is False -> same pushed-status
read as before). Verified end-to-end against a real herdr 0.6.8 server: a live
Claude Code agent launched in a herdr pane, reached idle, and completed a task.
Adds test/services/test_status_monitor.py and herdr cases to test/utils/test_terminal.py.
Addressed: herdr backend terminal-init hang (Copilot review —
|
There was a problem hiding this comment.
Review — Event-Driven Architecture + Herdr Integration Check
Mostly lgtm. Reviewed the full diff with focus on herdr backend compatibility and past issues (pane resolution, stale events, inbox delivery). Herdr integration is solid — the supports_event_inbox() gate correctly routes herdr through native status derivation, skips FIFO/pipe-pane, and preserves the socket-event inbox wiring.
Summary of findings (details in inline comments)
- API contract break:
TerminalStatus.UNKNOWNreplacesERRORas the fallback — downstream clients may not handle it - Herdr delivery callback: synchronous
deliver_pendingin async context — low priority but worth noting - BaseProvider docstring: says implementations "should" call
strip_terminal_escapesbut kiro intentionally doesn't — "MAY" is more accurate - EventBus queue overflow: acceptable with reconciliation sweep as safety net
- Missing test: no unit test for herdr branch in
status_monitor.get_status() - Dead code:
IDLE_PROMPT_PATTERN_LOGconstants remain in kiro_cli and claude_code afterget_idle_pattern_for_log()removal
| class TerminalStatus(str, Enum): | ||
| """Terminal status enumeration with provider-aware states.""" | ||
|
|
||
| UNKNOWN = "unknown" |
There was a problem hiding this comment.
API contract break: ERROR -> UNKNOWN
This changes the public contract: GET /terminals/{id} previously returned "error" when no output or no pattern matched; now it returns "unknown". This is arguably more correct (no output != error), but it is a breaking change for any downstream client (web UI, flow scripts, external integrations) that checks status == "error" as their catch-all.
Worth calling out explicitly in the PR description that this is an intentional API change. Downstream consumers need to handle "unknown" -- or at minimum confirm the web UI and flow scripts do not branch on "error" for the "not yet producing output" case.
| if get_backend().supports_event_inbox(): | ||
| try: | ||
| provider = provider_manager.get_provider(terminal_id) | ||
| except Exception: |
There was a problem hiding this comment.
Missing test for herdr branch
This if get_backend().supports_event_inbox() branch is the critical path for herdr status queries. I do not see a dedicated unit test that exercises it (terminal_service tests mock the backend but status_monitor tests do not). A targeted test with supports_event_inbox=True + a mock provider returning a known status would pin this contract.
| stream (cursor-positioning escapes, in-place ``\\r`` redraws, OSC titles), | ||
| NOT a tmux-rendered pane snapshot. Implementations that do structural / | ||
| line-oriented matching should run it through | ||
| ``cli_agent_orchestrator.utils.text.strip_terminal_escapes`` first |
There was a problem hiding this comment.
Docstring says should but kiro intentionally does not
The input contract says implementations that do structural/line-oriented matching should run strip_terminal_escapes. Kiro's get_status intentionally preserves
for permission prompt detection (and has a comment saying Do not switch this to strip_terminal_escapes). Consider softening to MAY so a future contributor does not fix kiro to comply with this docstring and break permission detection.
| cleanup_expired_memories, | ||
| cleanup_old_data, | ||
| ) | ||
| from cli_agent_orchestrator.services.event_bus import bus |
There was a problem hiding this comment.
Herdr delivery callback runs synchronous subprocess in async context
The deliver_inbox callback (used by HerdrInboxService) calls inbox_service.deliver_pending() synchronously. That method now also calls status_monitor.get_status() which for herdr invokes provider.get_status() -> get_native_status() (subprocess). Low priority since the reconciliation sweep catches misses, but for parity with the InboxService event-bus consumer (which wraps in asyncio.to_thread), this could benefit from the same treatment.
The event-driven status pipeline strips terminal escapes from the raw
8KB FIFO buffer to feed provider.get_status(). _LINE_START_CSI in
strip_terminal_escapes already turned CHA (\x1b[1G) and CNL (\x1b[E)
into \n so per-line patterns work, but missed CUP — Cursor Position
to column 1 (\x1b[<row>;1H).
Codex's TUI lays out its bottom prompt + status bar via CUP rather
than CHA: \x1b[46;1H› places the idle ``›`` at column 1 of row 46.
Without normalising this, the ``›`` glyph stayed glued mid-stream,
e.g.
›Improve documentation in @filenameopenai.gpt-5.5 medium · /tmp/...
The per-line check at codex.py:get_status:370 only inspects the bottom
five lines for ``\s*(?:❯|›|codex>)``, so the prompt was never detected
and codex sessions reported PROCESSING forever — every codex e2e test
hit "Codex initialization timed out after 60 seconds" (0/12 passing).
Extend _LINE_START_CSI to also match \x1b[\d+;1H. After the fix the
›-prefixed idle prompt sits on its own line, the existing
IDLE_PROMPT_PATTERN check matches, and codex idle detection works:
in replay against captured FIFO logs, 14/15 codex sessions reach idle
correctly. Remaining e2e failures are MCP startup >60s and OpenAI
``stream disconnected before completion`` API errors — outside the
scope of status detection.
Add a regression test covering both \x1b[46;1H› (codex's bottom
prompt) and the bare \x1b[1;1Hb form.
…on event-driven pipeline
Under tmux capture-pane, a TUI that redraws over its boot screen hides
"Initializing..." and the MCP-init line — Kiro's TUI -> --legacy-ui
fallback path also calls StatusMonitor.reset_buffer when retrying so
the rolling buffer stays clean. Yolo mode does NOT take that fallback
path: it forces --legacy-ui directly at launch, so its rolling FIFO
buffer keeps the boot bytes forever even after kiro has redrawn over
them and the actual interactive prompt is showing below.
Until now get_status returned PROCESSING unconditionally whenever
TUI_INITIALIZING_PATTERN matched anywhere in the buffer. Yolo
sessions therefore reported PROCESSING for the entire session and
wait_until_status({IDLE, COMPLETED}) timed out — kiro yolo e2e
went from passing to 0/11 under the new event-driven pipeline (awslabs#273).
Make Check 0 position-aware: only return PROCESSING from a matching
TUI_INITIALIZING_PATTERN when no real ``[agent] >`` idle prompt
appears AFTER the last init match. The new-TUI placeholder
("Ask a question or describe a task") is intentionally NOT counted
as a real idle prompt because the new TUI renders it during boot;
that pre-existing test
(test_tui_initializing_yields_processing_despite_idle_placeholder,
issue awslabs#211) still passes.
Update test_mcp_server_init_yields_processing — its old fixture
asserted boot-line + "[developer] !>" should yield PROCESSING, but
real captured Kiro logs show the [developer] prompt only renders
AFTER init completes, so the assumption was incorrect. Replace the
fixture with the actual placeholder text Kiro shows during boot,
and add test_mcp_server_init_with_post_init_prompt_yields_idle to
lock in the post-init redrawn-stale case (the yolo failure mode).
Verified end-to-end: kiro e2e (yolo, --legacy-ui) 0/11 -> 11/11.
… flap PR awslabs#273's event-driven pipeline derives status from a rolling 8KB FIFO buffer fed to per-provider get_status(). TUI redraws (status-bar refreshes, cursor positioning, footer repaints) keep emitting bytes for seconds AFTER the agent settles, eventually evicting the idle/response markers from the 8KB window. Without latching, status flaps rapidly between IDLE/COMPLETED and PROCESSING/UNKNOWN, and both wait_until_status (server-side) and the e2e tests' HTTP polling miss the brief ready windows — causing codex 60s init timeouts, gemini 240s init timeouts, and completion-timeout failures. StatusMonitor now refuses two downgrades once a ready status latches: - ready -> PROCESSING/UNKNOWN (typical buffer-eviction flap) - COMPLETED -> IDLE (codex's user-marker evicts before assistant's, so last_user is None and provider falls back to IDLE, silently overwriting COMPLETED) notify_input_sent() arms a one-shot revert gate so the latch releases when external input legitimately starts a new processing cycle: - terminal_service.send_input / send_special_key (runtime input) - each tested provider's initialize() before its launch keystrokes and bypass/trust prompt acknowledgements codex get_status now also handles the "long response evicted the user marker" case directly: when last_user is None, scan above the TUI footer for an assistant bullet and return COMPLETED instead of IDLE (was returning IDLE forever, which the COMPLETED->IDLE block above would otherwise cement as an off-by-one defence). gemini_cli now declares extraction_tail_lines = 5000 so its existing extraction_retries (3 x 10s) actually fires. The escalating-fetch path (200/500/1000/5000) ran each step once with no inter-step waits, so Ink-TUI redraws still rendering at extraction time fell through to [PARTIAL RESPONSE], leaving the TUI footer in the output and tripping the "? for shortcuts not in output" assertion. E2E results on PR awslabs#273 with these changes: codex 10/12 passing (1 pre-existing extraction issue, 1 xfail) claude 12/12 gemini 12/12 kiro 11/11
…e thread-safe Two hardening fixes to the sticky ready-status latch (PR #1): 1. Arm semantics: notify_input_sent()'s one-shot arm was consumed by ANY new ready latch, including a ready→ready downgrade flap (COMPLETED→IDLE when a large paste evicts the response markers, WAITING_USER_ANSWER→IDLE after a permission keystroke). Consuming the arm there blocks the genuine PROCESSING that follows, so the terminal reads ready while the agent is busy — and InboxService delivers on IDLE/COMPLETED, so a queued message could be pasted mid-response. The arm is now consumed only by a PROCESSING transition or a genuine non-ready→ready (init-style) latch. 2. Thread safety: the latch decision is a read-modify-write sequence (read armed → decide transition → consume arm) executed on the asyncio consumer, while notify_input_sent()/get_status()/clear_terminal() run on FastAPI threadpool, inbox delivery workers, and the cleanup thread. Guard StatusMonitor state with a lock (provider regex analysis and bus.publish stay outside it). Also covers the pre-existing Copilot review comments about unsynchronized _buffers/_last_status access. Adds a latching state-machine test suite (12 cases) pinning blocked downgrades, arm consumption, flap survival, and event publication.
Real failure from the supervisor-handoff e2e: mid-turn, Claude shows an interim completion summary from an earlier thinking phase, then keeps working — '● Calling cao-mcp-server… (ctrl+o to expand)' with a live '✢ Misting… (33s · ↑ 332 tokens)' spinner and a '⎿ Tip: …' hint line between the spinner and the input box. Two gaps combined into a false COMPLETED, which the StatusMonitor ready-latch then pinned until the next input, so the e2e test extracted mid-flight output: - _boxless_completion_tail treated ANY completion summary after the last separator as a finished turn. It now requires that no live spinner (glyph + gerund + ellipsis) renders after the summary — an interim summary followed by a fresh spinner keeps the turn PROCESSING. - The new-TUI box-spinner check looked only at the single line directly above the input box; '⎿ Tip:' hint lines and blanks render between the live spinner and the box, hiding it. The check now walks up to 4 lines, skipping ⎿ continuation lines and blanks. Regression tests reproduce both shapes (fail pre-fix) plus a guard that a summary with no following spinner still reads COMPLETED.
The newest 'Kimi Code' TUI renders user messages as ✨-prefixed prompt lines (no ╭─ input box), responses as • bullets, and a '── input ──' rule plus status bar as footer. extract_last_message_from_script() preferred the last ╰─ box-end as the response anchor — which in the new TUI matches decorative boot banners (Kimi welcome box, FastMCP server banner) ABOVE the conversation — and only stopped at a bare idle prompt, which the new TUI never renders. Result: the 'response' was sliced from the boot screen and ran to end-of-capture (raw spinner frames, status bar), failing the supervisor-assign e2e. - Anchor on the LATEST marker (box-end vs prompt-with-input) instead of box-first: the response always follows the last user input, whichever marker style rendered it. - Stop the response at the new-TUI footer: the '── input ──' rule or the status-bar/context-footer line, in addition to the bare idle prompt. Fixtures are ground truth from a live Kimi Code session.
- LogWriter: open log files with explicit encoding='utf-8', errors='replace' — a non-UTF-8 platform locale (POSIX/C) would raise UnicodeEncodeError on the first unencodable chunk and stop log persistence for that terminal. - EventBus.set_loop: accept Optional[loop] (test fixtures already call set_loop(None)); publish() now guards against the loop being closed during shutdown instead of raising RuntimeError from a FIFO reader thread draining its last chunks. - BaseProvider.get_status docstring: soften the strip_terminal_escapes 'should' to MAY with an explicit note that kiro_cli intentionally preserves raw \r for permission-prompt detection and must not be 'fixed' to comply. The 'except Exception swallows CancelledError' comments are intentionally NOT addressed: since Python 3.8, asyncio.CancelledError derives from BaseException, so those loops do not swallow cancellation.
Kimi (v1.20+) launches via 'cd <temp_dir> && kimi --agent-file <temp_dir>/agent.yaml', writing the system prompt — including the injected skill catalog — to <temp_dir>/system.md. The catalog is no longer in the CLI command string, and the full-screen Kimi Code TUI clears the visible screen on startup, so asserting against raw tmux scrollback always failed. Parse the temp dir from the launch command (capture-pane -J so soft-wrapped lines can't split the path mid-token) and assert against system.md on disk — same pattern as the existing gemini GEMINI.md branch.
…de TUI
The newest TUI renders the live spinner BETWEEN the '── input ──' rule and
the status bar, and repaints the status bar with every frame — so the
spinner-vs-ready-chrome position compare read 'ready' mid-turn whenever a
full footer repaint was the freshest chunk (measured: one supervisor turn
flapped completed↔processing 29 times in a 57KB stream). The StatusMonitor
ready-latch then pinned the first false COMPLETED ~130ms after dispatch,
so supervisor flows extracted mid-flight output (supervisor-assign e2e).
Detection now uses signals validated by replaying captured live streams:
- Live-spinner lines: braille/moon glyphs (incl. tool-call lines like
'⠹ Using handoff({…})'), EXCLUDING boot chrome ('⠧ MCP Servers: 0/1',
'⠋ Loading configuration...') which legitimately shows braille while
idle at the welcome screen.
- In flight when a live spinner is within the freshest 15 lines OR renders
after the last '•' bullet (covers chunk boundaries where streamed
thinking text temporarily pushed the spinner out of the tail).
- Dispatch grace: 5s after send_input() (mark_input_received now stamps
the dispatch), bridging the paste→first-spinner-frame gap.
- Rendered-pane confirmation: a ready-looking chunk boundary mid-turn is
byte-identical to one at real completion (stale spinner ~21 lines back,
bullets 2-3 from the end in BOTH), so when the stream says ready
post-dispatch, confirm against the rendered pane — tmux's compositor has
resolved every in-place redraw, so a visible spinner is live, not stale.
Also raises the e2e SUPERVISOR_COMPLETION_TIMEOUT 300→600s: a kimi worker
alone takes ~3m20s on the report-template handoff; 300s only looked
sufficient while the false early COMPLETED was masking the real duration.
E2E (live agents): kimi supervisor handoff + assign_and_handoff now pass;
kimi handoff x2, send_message, skills unaffected and green.
…or/NotebookEdit The allowed-tools e2e caught restricted agents escaping their restrictions live: a reviewer (no Bash/Write) created the forbidden file anyway — first 'via a delegated subagent that ran the write through a shell command' (Task), then on retry via 'the Monitor tool' (background shell scripts). CAO computes --disallowedTools as (tool universe - allowed natives), and claude_code's universe only contained Bash/Read/Edit/Write/Glob/Grep — so the execution-capable tools outside it were never disallowed. Gate them by privilege equivalence: - execute_bash now maps Bash, BashOutput, KillShell, Task (subagents run with their own full toolset), and Monitor (runs arbitrary shell scripts). Anything these can do, Bash can too, so profiles allowed execute_bash lose nothing. - fs_write now maps NotebookEdit alongside Edit/Write (.ipynb writes). Known limitation: this is enumeration against an evolving toolset — each new execution-capable Claude Code tool needs adding here. A deny-by-default mechanism upstream would be the durable fix. E2E (live, herdr backend): all 4 TestClaudeCodeAllowedTools pass, including the two that previously demonstrated the escape.
anilkmr-a2z
left a comment
There was a problem hiding this comment.
All review concerns addressed in the latest commits:
- BaseProvider docstring: changed "should" to "MAY" with explicit kiro carve-out
- StatusMonitor tests: TestStickyLatching class covers the herdr and tmux state machine paths
- Thread safety: RLock added to StatusMonitor
- EventBus: set_loop accepts None, publish handles closed-loop RuntimeError gracefully
- LogWriter: explicit UTF-8 encoding
The UNKNOWN vs ERROR API contract change is the only item not explicitly called out in PR description -- worth a one-liner there for changelog purposes but not blocking.
Herdr integration verified intact across all new commits. Ship it.
Summary
Continues @tuanknguyen's event-driven architecture work from #115, per @haofeif's suggestion there. That branch had fallen ~98 commits behind
mainand was failing CI (a staleINBOX_POLLING_INTERVALimport left after the constant was removed). This merges currentmaininto it, resolves the conflicts, and gets the full unit suite passing. It's also the "unified delivery engine" I referenced in #266.What it does
Replaces the watchdog/polling model for terminal output, status detection, and inbox delivery with an in-process event bus:
pipe-pane) rather than being polled from log files.StatusMonitoris the single source of truth for status; providers'get_status()now parse a buffer string instead of reading tmux themselves.InboxServicedelivers on status events; thewatchdogPollingObserver/LogFileHandleris removed.initialize()andterminal_service.create_terminal()are now async.Reconciling with
mainAll features that landed after the branch forked are preserved on top of the event-driven base:
InboxService.deliver_pending.poll_opencode_pending_messages(OpenCode's output goes quiet once idle, so the event bus alone may not wake delivery).copilot_cli/opencode_clito the asyncinitialize+ bufferget_statuscontract.Testing
pytest test/ --ignore=*_integration.py --ignore=test/e2e -m "not e2e"), Python 3.11.black --checkandisort --check-onlyclean.Supersedes #115. Related: #266.