Add remote orchestration and smart parallelism heuristics#258
Conversation
Feature 1: Remote Orchestration (P1) - Add orchestration message types for start/pause/resume/stop/get_state - Add orchestration session tracking in RemoteServer - Add setParallelConfig() for enabling remote orchestration - Add 5 orchestration handlers and parallel event broadcasting - Add orchestration methods to RemoteClient - Wire up parallel config in run.tsx when --listen is enabled - Add cleanup logic to stop orchestration on client disconnect Feature 3: Smart Parallelism Heuristics (P3) - Add recommendParallelism() function in task-graph.ts - Detect test-heavy workloads (safe for high parallelism) - Detect refactor-heavy workloads (reduce parallelism) - Detect file overlap from metadata.affects field - Add ParallelismRecommendation type - Integrate heuristics in run.tsx parallel execution path Bug fixes from CodeRabbit review: - Fix worker ceiling: recommendedWorkers can't exceed defaultMax - Fix resource leak: cleanup orchestration when client disconnects - Fix NaN validation: validate parseInt results in task range parsing - Fix edge case: use !== undefined checks for range string generation
|
The latest updates on your projects. Learn more about Vercel for GitHub. |
WalkthroughAdds CLI task-range filtering and threads filtered task IDs through engine, parallel executor and remote orchestration; introduces recommendParallelism heuristics and new parallelism types; extends remote client/server with orchestration RPCs, session lifecycle and parallel-event broadcasting. Changes
Sequence Diagram(s)sequenceDiagram
rect rgba(0,128,255,0.5)
actor User
participant Client as Remote Client
participant Server as Remote Server
participant TaskGraph as Task Graph
participant Executor as Parallel Executor
end
User->>Client: startOrchestration(options)
Client->>Server: OrchestrateStartMessage
Server->>TaskGraph: analyze tasks & compute groups
TaskGraph-->>Server: analysis + groups
Server->>Executor: create ParallelExecutor (baseConfig + filteredTaskIds)
Executor->>Executor: run groups in parallel
Executor-->>Server: emit ParallelEvent
Server->>Client: ParallelEventMessage (broadcast)
Client-->>User: parallel_event callback
sequenceDiagram
rect rgba(0,128,0,0.5)
participant CLI as CLI (run.tsx)
participant Filter as filterTasksByRange
participant Analysis as Task Graph / recommendParallelism
participant Executor as Parallel Executor
end
CLI->>CLI: parseRunArgs (--task-range)
CLI->>Filter: filterTasksByRange(tasks, range)
Filter-->>CLI: filtered tasks + message
CLI->>Analysis: analyse(filtered tasks)
Analysis-->>CLI: recommendation (workers, confidence)
CLI->>Executor: execute(filtered tasks, maxWorkers)
Executor-->>CLI: completion
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #258 +/- ##
==========================================
- Coverage 45.70% 44.95% -0.75%
==========================================
Files 91 91
Lines 27442 28124 +682
==========================================
+ Hits 12542 12643 +101
- Misses 14900 15481 +581
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/commands/run.tsx`:
- Around line 2430-2435: filterTasksByRange currently only updates the local
tasks array but ExecutionEngine and ParallelExecutor call the tracker directly
and ignore that filter; propagate the filtered IDs into the execution selection
by obtaining the filtered task IDs from filterTasksByRange and passing them into
the engine/executor creation or run methods (e.g. pass filteredTaskIds into
ExecutionEngine.initialize/run or ParallelExecutor.run) or alter the tracker
query used by ExecutionEngine/ParallelExecutor to include the same range filter
(use options.taskRange or filteredTaskIds) so both engines only fetch and
execute the narrowed set of tasks; update references in the selection logic that
construct ExecutionEngine and ParallelExecutor to accept and forward the
filtered IDs.
In `@src/remote/client.ts`:
- Around line 450-458: The reconnect path drops 'parallel_event' messages
because attemptReconnect()'s onmessage handler doesn't route them; update
attemptReconnect's onmessage to mirror the main dispatcher by handling the
'parallel_event' case (treat message as ParallelEventMessage and call
this.eventHandler with type:'parallel_event', orchestrationId and event) or
refactor so attemptReconnect delegates incoming messages to the shared
dispatcher method (e.g., call the same handler used in the primary onmessage) to
ensure orchestration updates are processed after re-auth.
In `@src/remote/server.ts`:
- Around line 1364-1373: Validate and reject non-positive maxWorkers values
before creating ParallelExecutor: check message.maxWorkers (and the computed
maxWorkers) is an integer > 0, and if not either throw/return a clear error or
fall back to a safe default; update the code that sets maxWorkers (the variable
used to construct the ParallelExecutor instance) and ensure
ParallelExecutor.batchTasks can assume maxWorkers > 0. Reference: the maxWorkers
local variable, message.maxWorkers, and the ParallelExecutor class/batchTasks
method.
…alidation Fix 1: Propagate task range filter to execution engines - Add filteredTaskIds to ParallelExecutorConfig and RalphConfig - Update ParallelExecutor.execute() to filter tasks by allowed IDs - Update ExecutionEngine.getNextAvailableTask() to respect filteredTaskIds - Set config.filteredTaskIds in run.tsx when --task-range is specified - Pass filteredTaskIds to ParallelExecutor constructor Fix 2: Handle parallel_event in reconnect handler - Add parallel_event case to attemptReconnect's onmessage handler - Mirrors the primary dispatcher to ensure orchestration updates are processed after re-auth Fix 3: Validate maxWorkers before creating ParallelExecutor - Check that maxWorkers is a positive integer - Return clear error message for invalid values - Prevents issues in batchTasks assuming maxWorkers > 0
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/commands/run.tsx`:
- Around line 381-414: The parser currently rejects "-10" because the case
'--task-range' branch checks !nextArg.startsWith('-'); update that guard to also
accept negative-number values (e.g. allow nextArg when it matches a
negative-integer pattern like /^-\d+$/) so end-only ranges like "-10" are
treated as a range string; keep the existing parsing logic that computes
dashIndex, parsedStart/parsedEnd and sets options.taskRange, and still advance
i++ after consuming the value.
In `@src/engine/index.ts`:
- Around line 590-596: The current early return stops the engine when the
tracker yields a task outside the allowed set; change the logic so that if
this.config.filteredTaskIds is defined and length === 0 you immediately return
null (explicit empty allowed list => yield no tasks), otherwise when a task
exists but its id is not in this.config.filteredTaskIds simply skip it and
continue the containing task-yielding loop until you find an allowed task or run
out; update the block referencing task and this.config.filteredTaskIds (inside
the method that obtains/yields tasks) to check for the empty-array case first,
and replace the `return null` on mismatch with a continue/skip so other allowed
tasks can still be found.
In `@src/remote/server.ts`:
- Around line 377-384: The disconnect handler currently calls
self.orchestrationSession.executor.stop() without awaiting it before calling
self.orchestrationSession.unsubscribe() and nulling self.orchestrationSession;
change this to await the stop() (e.g., await
self.orchestrationSession.executor.stop().catch(() => {})) before calling
self.orchestrationSession.unsubscribe() and setting self.orchestrationSession =
null so the stop completes (matching the handleOrchestrateStop pattern) and
avoids racing file I/O cleanup in ParallelExecutor.stop().
🧹 Nitpick comments (1)
src/remote/server.ts (1)
1455-1496: Consider adding error handling for pause/resume.The
executor.pause()andexecutor.resume()calls have no error handling. Whilst these are typically synchronous state changes, if they throw (e.g., due to invalid state), the error would propagate unhandled.For consistency with
handleOrchestrateStop, consider wrapping in try/catch:🛠️ Optional: Add error handling
private handleOrchestratePause( ws: ServerWebSocket<WebSocketData>, message: OrchestratePauseMessage ): void { if (!this.orchestrationSession || this.orchestrationSession.id !== message.orchestrationId) { this.sendOperationError(ws, message.id, 'orchestrate:pause', 'No matching orchestration session'); return; } + try { this.orchestrationSession.executor.pause(); this.orchestrationSession.status = 'paused'; const response = createMessage<OperationResultMessage>('operation_result', { operation: 'orchestrate:pause', success: true, }); response.id = message.id; this.send(ws, response); + } catch (error) { + this.sendOperationError( + ws, + message.id, + 'orchestrate:pause', + error instanceof Error ? error.message : 'Failed to pause orchestration' + ); + } }
Tests cover: - No specific patterns (low confidence, full parallelism) - Test-heavy workloads by title and labels (high confidence) - Refactor-heavy workloads by title and labels (reduced parallelism) - Moderate refactor presence (25-50%, medium confidence) - File overlap detection via metadata.affects - Empty task list edge case - DefaultMax ceiling enforcement - Priority of refactor detection over test detection - Case insensitive keyword matching Test coverage for src/parallel/task-graph.ts: 99.6%
Tests cover: - Filtering with explicit range (start-end) - Filtering with open-ended start (n-) - Filtering with open-ended end (-n) - Single task filtering (n-n) - Empty task list handling - Invalid range handling (start < 1, end < start) - Range exceeding task count - Message formatting for all range types Also exports filterTasksByRange and TaskRangeFilter for testability.
- Add 8 tests to engine/index.test.ts for filteredTaskIds filtering behavior - Add 4 tests to parallel-executor.test.ts for filteredTaskIds config - Tests cover: config acceptance, empty/undefined arrays, filter behavior
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/engine/index.test.ts`:
- Around line 265-306: The test is failing because
ExecutionEngine.getNextAvailableTask treats a tracker-provided task outside
config.filteredTaskIds as end-of-queue; change getNextAvailableTask so that if
tracker.getNextTask() returns a task whose id is not in config.filteredTaskIds
it does not immediately return null but instead skips that task and finds the
next allowed one (either by calling tracker.getTasks() and selecting the first
open task whose id is in config.filteredTaskIds, or by iterating calls to
getNextTask until an allowed id is returned), and update the test expectation to
assert the engine returns a permitted task (e.g., task-1 or task-3) or null only
if no allowed tasks exist; refer to ExecutionEngine.getNextAvailableTask,
TrackerPlugin.getNextTask, TrackerPlugin.getTasks, and config.filteredTaskIds
when making the change.
🧹 Nitpick comments (1)
src/parallel/parallel-executor.test.ts (1)
365-377: Test name doesn’t match assertions.The test claims to preserve
filteredTaskIdsthroughgetState, but it only asserts idle state and empty workers. Rename it for clarity, or assert preservation if the state exposes it.Suggested rename
- test('preserves filteredTaskIds through getState', () => { + test('returns idle state with filteredTaskIds configured', () => {
- run.tsx: Allow --task-range to accept "-10" format (negative-integer pattern) - engine/index.ts: Change filteredTaskIds filtering to loop and skip disallowed tasks until finding an allowed one or running out; empty array returns null - remote/server.ts: Make close handler async and await stop() before cleanup - engine/index.test.ts: Update tests with smarter mocks that respect excludeIds
- Add 20 tests covering --task-range parsing (all formats) - Add tests for --parallel, --direct-merge, --serial, --listen flags - Add tests for combined options - Increases coverage of run.tsx CLI parsing code
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/remote/server.ts`:
- Around line 1336-1387: handleOrchestrateStart currently analyzes the full task
list and constructs ParallelExecutor without honoring
baseConfig.filteredTaskIds, so filter the tasks returned from
this.options.tracker.getTasks by this.options.baseConfig.filteredTaskIds (or by
message-provided range) before calling analyzeTaskGraph; if the filter yields
zero tasks, send the 'orchestrate:start_response' failure as you do for no
actionable tasks; when creating the ParallelExecutor, include the filtered IDs
in its options (e.g. pass filteredTaskIds) so the executor only schedules those
tasks and reports correct totals (update the ParallelExecutor instantiation to
accept and forward filteredTaskIds).
- Add 20 tests for parseRunArgs CLI parsing (task-range, parallel, listen, etc.) - Add 5 tests for RemoteServer class (constructor, setTracker, setParallelConfig) - Total: 120 tests passing across 5 test files
- Filter tasks by baseConfig.filteredTaskIds before analyzing task graph - Pass filteredTaskIds to ParallelExecutor so it only schedules allowed tasks - Return appropriate error message when filter yields zero tasks
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/remote/server.ts`:
- Around line 1398-1421: Add a clarifying comment near the orchestrationSession
assignment explaining that session persistence after completion is intentional:
note that the executor subscription (executor.on -> unsubscribe) forwards events
via broadcastParallelEvent and only updates orchestrationSession.status, and
that actual cleanup happens on client disconnect (the existing disconnect
handler stops and clears orchestrationSession), so keeping the session object
allows clients to call "orchestrate:get_state" to inspect final state until the
session is explicitly stopped or the originating client disconnects.
🧹 Nitpick comments (1)
src/remote/server.ts (1)
1587-1597: Consider adding explicit parallel event subscription.Currently, only the client that starts orchestration is automatically subscribed to parallel events (line 1424). Other authenticated clients cannot subscribe to observe parallel execution progress.
If this is intentional for security/simplicity, consider documenting it. Otherwise, you could extend the existing
subscribemessage to support parallel events via theeventTypesarray.
- Test printRunHelp function outputs expected content - Verifies help text includes key options like --task-range, --parallel
Explains why the session object is intentionally kept after execution completes rather than cleaned up immediately. The design allows clients to query final state via "orchestrate:get_state" until the originating client disconnects or explicitly stops the session.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/remote/server.ts`:
- Around line 1340-1351: The current check treats an empty array as "no filter"
and runs all tasks; instead, in the block using
this.options.baseConfig.filteredTaskIds (variable filteredTaskIds) explicitly
treat filteredTaskIds !== undefined as a provided filter: if filteredTaskIds ===
[] (length === 0) immediately return the failure response via
createMessage<OrchestrateStartResponseMessage>('orchestrate:start_response',
...) with an appropriate error like 'No tasks match the specified filter';
otherwise apply the Set filter to tasks as you already do. Update the logic
around tasks.length check so the empty-filter case is handled before filtering
and does not fall through to run all tasks.
- Around line 1314-1335: The orchestrate:start handler can race and create two
executors because this.orchestrationSession is only set after async work; add a
short-lived "starting" guard (e.g., this.orchestrationStarting) checked
alongside this.orchestrationSession at the top of the handler to immediately
reject concurrent starts, set this.orchestrationStarting = true before any
awaits/async calls that lead to creating the session, and clear it (set to
false) on all failure paths and after the session is successfully assigned to
this.orchestrationSession; update the response logic to return the same error
when orchestration is in progress or starting so the start becomes atomic.
1. Race condition fix: Add `orchestrationStarting` guard flag to prevent concurrent orchestrate:start requests from both creating executors. The flag is set before any async operations and cleared on all failure paths and after the session is successfully established. 2. Empty filter handling: When `filteredTaskIds` is defined but empty (length === 0), immediately return an error response with "No tasks match the specified filter" instead of falling through to run all tasks. Both issues were identified during PR code review.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/remote/server.ts`:
- Around line 1460-1461: The comment above the assignment to
this.orchestrationStarting should be reworded to remove temporal phrasing;
replace "Clear the starting guard now that session is established" with an
evergreen, state-focused comment such as "Clear the orchestration starting guard
after establishing the session" or "Reset orchestrationStarting to indicate
session is established"; update the comment immediately preceding the
this.orchestrationStarting = false; statement in server.ts (the line that sets
orchestrationStarting) accordingly.
…-smart-heuristics Add remote orchestration and smart parallelism heuristics
Summary
Implements two features from the parallel execution follow-on plan (#252):
Feature 1: Remote Orchestration (P1)
orchestrate:start,orchestrate:pause,orchestrate:resume,orchestrate:stop,orchestrate:get_stateFeature 3: Smart Parallelism Heuristics (P3)
recommendParallelism()analyzes task characteristics to recommend optimal worker countmetadata.affectsfieldmaxWorkersbefore executionBug Fixes (from CodeRabbit review)
recommendedWorkerscan't exceeddefaultMaxparseIntresults in task range parsing!== undefinedchecks for range string generation (handles0correctly)Test Plan
--task-rangeflag with various formatsSummary by CodeRabbit
New Features
Documentation
Tests