feat: parallel task execution across multiple hosts#89
Conversation
Extends TaskConfig with fields for parallel task orchestration: - Parallel: list of task names to run concurrently - FailFast: stop on first failure - MaxParallel: limit concurrent execution (0 = unlimited) - Timeout: per-task timeout duration - Output: task-specific output mode override Adds LogsConfig struct for log retention settings: - Dir: log directory (default ~/.rr/logs) - KeepRuns: number of recent runs to keep (default 10) - KeepDays: days to retain logs - MaxSizeMB: max total log size Also adds IsParallelTask() helper function and includes Logs field in GlobalConfig with sensible defaults. Fixes rangeValCopy lint warning in task.go triggered by the increased struct size. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add the core parallel execution components: - types.go: Core types for parallel execution (OutputMode, Config, Result, TaskResult, TaskStatus, TaskInfo) - orchestrator.go: Coordinates parallel task execution across multiple hosts using work-stealing queue pattern with channel-based task distribution - worker.go: Host-specific worker that handles connections, file sync, and command execution for individual tasks - output.go: OutputManager with support for progress, stream, verbose, and quiet output modes with TTY detection - summary.go: Renders formatted execution summaries with per-task results, aggregate stats, and actionable retry suggestions Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add CLI integration for parallel task execution: - RunParallelTask function orchestrates parallel task runs - Parallel tasks auto-detected and use specialized command flags (--stream, --verbose, --quiet, --fail-fast, --max-parallel, --no-logs, --dry-run) - Log storage with LogWriter for task outputs and summary.json - Log cleanup based on retention policy (keep_runs, keep_days, max_size_mb) - New `rr logs` command to view and clean log directories - Parallel task validation (no circular refs, no nested parallel) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add comprehensive unit tests for the new parallel execution code: - validate_test.go: Parallel task validation (mutual exclusivity, reference validation, nested parallel rejection) - orchestrator_test.go: Work queue behavior, max-parallel limiting, fail-fast vs continue, empty task handling, context cancellation - output_test.go: Output mode initialization, stream mode prefixing, TTY detection fallback - cleanup_test.go: CleanByRuns, CleanByAge, CleanBySize, policy priority order (size > days > runs) Add inline code comments explaining: - Work-stealing queue design in orchestrator.go - Output mode selection logic in output.go - Log cleanup policy precedence in cleanup.go Update .rr.yaml with parallel task examples (test-all, quick-check). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Caution Review failedThe pull request is closed. WalkthroughImplements config-driven parallel task execution: new orchestrator, workers, output manager, per-run log writer and retention/cleanup, CLI integration (parallel task flags, rr logs), and comprehensive validation and tests. Changes
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI / RunParallelTask
participant Config as Config Loader
participant Orch as Orchestrator
participant Worker as Host Worker (N)
participant SSH as SSH / Executor
participant OutMgr as OutputManager
participant LogWriter as LogWriter
participant Summary as Summary Renderer
CLI->>Config: Load & validate config
Config-->>CLI: TaskConfig + resolved hosts
CLI->>Orch: NewOrchestrator(tasks, hosts)
CLI->>OutMgr: NewOutputManager(mode, isTTY)
alt SaveLogs enabled
CLI->>LogWriter: NewLogWriter(baseDir, taskName)
end
CLI->>Orch: Run(ctx)
Orch->>Orch: Build task queue
Orch->>Worker: Spawn workers (per host)
par Parallel Task Execution
loop Worker(s)
Worker->>Orch: Pull task
Worker->>SSH: ensureConnection
SSH-->>Worker: SSH handle
Worker->>SSH: ensureSync (once per host)
Worker->>SSH: execCommand(task, timeout)
SSH-->>Worker: stdout/stderr (streamed)
Worker->>OutMgr: TaskOutput lines
Worker->>Orch: Send TaskResult
end
end
Orch->>Orch: Collect results (honor fail-fast)
Orch->>Orch: Build final Result
alt SaveLogs enabled
Orch->>LogWriter: WriteTask(...)
Orch->>LogWriter: WriteSummary(...)
end
Orch-->>CLI: Return Result
CLI->>Summary: RenderSummary(result, logDir)
Summary-->>CLI: Formatted output
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (11)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In @.rr.yaml:
- Around line 119-124: Update the quick-check task description to accurately
reflect the subtasks it runs: change the description string from mentioning "vet
+ fmt check" to "vet + lint check" (referencing the quick-check task and its
parallel entries 'vet' and 'lint') so the human-readable description matches the
actual parallel tasks.
In @internal/cli/parallel.go:
- Around line 127-132: The current block silently ignores parse errors for
task.Timeout (when opts.Timeout == 0) by calling time.ParseDuration and only
using the duration on success; change it to log or warn when parsing fails so
configuration mistakes aren't hidden: in the same conditional around
task.Timeout and opts.Timeout, capture the error from
time.ParseDuration(task.Timeout) and call the project logger or a warning helper
(e.g., use the existing logger used in this package) to emit a clear message
including the invalid value (task.Timeout) and the parse error, while still
falling back to the default behavior if parsing fails; update references to
task.Timeout, opts.Timeout, parallelCfg.Timeout and the time.ParseDuration call
accordingly.
- Around line 284-290: Remove the unused dead function AddParallelFlags (which
takes *ParallelTaskOptions and returns the flag binder) from
internal/cli/parallel.go; delete its definition and any related unused type
references if they become orphaned (e.g., ParallelTaskOptions) and ensure
parallel flags remain configured via createParallelTaskCommand in task.go as
noted in the review comment.
In @internal/parallel/logs/writer.go:
- Around line 112-123: The StartTime in the summary is computed at write time
(time.Now().Add(-result.Duration)) which is inaccurate; update the code to use
the actual run start time by first adding a StartTime field to the Result struct
(set when the run begins) and use result.StartTime when populating
SummaryJSON.StartTime, and as a fallback compute the earliest StartTime from
result.TaskResults (iterate TaskResults and pick the minimum StartTime) if
result.StartTime is zero-valued; ensure SummaryJSON, the Result type, and any
creators of Result are updated to set that StartTime.
In @internal/parallel/output.go:
- Around line 104-123: TaskOutput in OutputManager buffers unbounded per-task
output (taskOutput) causing memory growth in verbose/progress modes; add a
per-task cap (e.g., const maxBufferSize) and only append to the buffer when
buf.Len() < maxBufferSize, optionally set a truncated flag or counter in the
OutputManager (e.g., taskTruncated or taskTruncationCount) so callers can
indicate output was truncated, and ensure this logic is applied inside
TaskOutput for modes OutputVerbose/OutputProgress while keeping streaming
behavior for OutputStream.
In @internal/parallel/summary.go:
- Around line 141-148: The code currently calls sort.Strings(result.HostsUsed)
which mutates the Result in place; instead, make a copy of result.HostsUsed
(e.g., allocate a new []string and copy the values), sort the copy with
sort.Strings, and pass the sorted copy to strings.Join (while keeping
mutedStyle.Render and the existing fmt.Fprintf call unchanged) so rendering does
not modify result.HostsUsed.
In @internal/parallel/worker.go:
- Around line 176-182: The function currently hides connection failures by
returning (1, nil) when w.conn or w.conn.Client is nil; update the branch in the
method that checks w.conn and w.conn.Client to return a non-nil error describing
the failed SSH connection (including identifying info from the worker such as
w.Host or w.addr if available) instead of nil, so callers receive an explanatory
error rather than just exit code 1; keep the normal path that calls
w.conn.Client.ExecStream(fullCmd, stdout, stderr) unchanged.
- Around line 217-221: The shellQuote function currently returns "\"" + s + "\""
which does not escape dangerous characters and allows command injection; update
shellQuote to wrap the value in single quotes and escape any existing single
quotes in s by replacing each ' with the sequence '\'' (i.e. close single quote,
insert escaped single quote, reopen single quote) so the resulting string is
safe for the shell, and add the "strings" import used for the replace operation;
keep the function name shellQuote and ensure all callers continue to receive the
safely quoted string.
🧹 Nitpick comments (18)
internal/config/types.go (1)
298-301: Consider nil-safety for the helper.If
taskis nil, this will panic. Depending on how it's called, a nil check might be prudent:func IsParallelTask(task *TaskConfig) bool { + if task == nil { + return false + } return len(task.Parallel) > 0 }internal/parallel/logs/cleanup.go (1)
40-49: Consider extracting tilde expansion to a helper.The
~expansion logic is duplicated three times (inCleanup,CleanAll, andListLogDirs). A small helper would reduce duplication:func expandTilde(path string) (string, error) { if len(path) > 0 && path[0] == '~' { home, err := os.UserHomeDir() if err != nil { return "", errors.WrapWithCode(err, errors.ErrConfig, "Can't determine home directory", "Check your environment configuration.") } return filepath.Join(home, path[1:]), nil } return path, nil }Also applies to: 194-203, 309-318
internal/config/validate.go (1)
422-461: The comment mentions cycle detection but only self-reference is checked.The function comment says "No circular references (task A -> task B -> task A)" but the implementation only checks direct self-reference (line 453-456). The nested-parallel check (line 447-451) prevents
A → parallel-B, but doesn't catch indirect cycles likeA → B → C → Aif B and C were simple tasks (which isn't possible currently).Since nested parallel is disallowed, indirect cycles through parallel tasks are impossible. However, if this constraint were ever relaxed, you'd need full cycle detection. Consider clarifying the comment:
// ValidateParallelTasks validates parallel task references after individual tasks are validated. // This checks: // - All referenced tasks exist -// - No circular references (task A -> task B -> task A) // - No nested parallel (parallel task can't reference another parallel task) +// - No self-reference (indirect cycles are prevented by the no-nesting rule)internal/parallel/output_test.go (2)
165-180: Potential race condition when accessing internal state.The test accesses
mgr.taskOutputmap while holding the lock, which is correct. However, callingmgr.TaskOutput()before the lock acquisition could race with the internal map access. This is likely fine in a single-threaded test, but consider usingt.Parallel()carefully if you ever parallelize these tests.
240-261: Good boundary testing for duration formatting.The test cases cover sub-second, second, and minute boundaries well. Consider adding a test case for zero duration (
0 * time.Millisecond) to verify edge behavior.internal/parallel/worker.go (2)
107-128: Context parameter is ignored in ensureConnection.The context is accepted but not used. If the caller cancels the context during a slow SSH connection attempt, the connection will still proceed. Consider threading the context through to the selector or adding a cancellation check.
184-215: buildFullCommand assembles commands correctly but could use strings.Builder.The logic is sound for constructing the compound shell command. Minor efficiency improvement: use
strings.Builderinstead of string concatenation in a loop.internal/cli/task.go (2)
339-353: Redundant TaskName in runParallelTaskCommand call.
nameis passed as the first parameter torunParallelTaskCommandbut it's unused (parameter is_). The name is also set inopts.TaskName. Consider removing the unused parameter.Proposed cleanup
-func runParallelTaskCommand(_ string, opts ParallelTaskOptions) error { +func runParallelTaskCommand(opts ParallelTaskOptions) error {And update the call site:
- return runParallelTaskCommand(name, ParallelTaskOptions{ + return runParallelTaskCommand(ParallelTaskOptions{
423-453: Consider letting Cobra auto-generate flag documentation.The manual flag documentation in the long description duplicates what Cobra's
--helpwould show. This creates a maintenance burden if flags change. You could simplify by removing lines 443-450 and letting users runrr <task> --helpfor flag details.internal/cli/logs.go (1)
176-184: Errors silently ignored when counting deleted directories.If
ListLogDirsfails here, the deleted count shown to the user will be inaccurate. While not critical, logging a warning would help debug issues.♻️ Suggested improvement
- beforeDirs, _ := logs.ListLogDirs(baseDir) + beforeDirs, err := logs.ListLogDirs(baseDir) + if err != nil { + // Continue with cleanup, but we won't be able to report count + fmt.Println("Running cleanup based on retention settings...") + return logs.Cleanup(logsCfg) + } beforeCount := len(beforeDirs) if err := logs.Cleanup(logsCfg); err != nil { return err } - afterDirs, _ := logs.ListLogDirs(baseDir) + afterDirs, err := logs.ListLogDirs(baseDir) + if err != nil { + fmt.Println("Cleanup completed.") + return nil + } afterCount := len(afterDirs)internal/parallel/logs/cleanup_test.go (2)
76-82: Potential panic on short directory names.
e.Name()[:6]will panic if a directory name is shorter than 6 characters. This is a test file, so it's low risk, but could cause confusing failures if test data changes.♻️ Suggested fix
for _, e := range entries { - if e.Name()[:6] == "task-a" { + if len(e.Name()) >= 6 && e.Name()[:6] == "task-a" { taskACount++ - } else if e.Name()[:6] == "task-b" { + } else if len(e.Name()) >= 6 && e.Name()[:6] == "task-b" { taskBCount++ } }Or use
strings.HasPrefix:if strings.HasPrefix(e.Name(), "task-a") { taskACount++ } else if strings.HasPrefix(e.Name(), "task-b") { taskBCount++ }
351-364: Consider removing trivial struct test.This test only verifies struct field assignment, which is guaranteed by the compiler. It doesn't test any behavior.
internal/parallel/orchestrator.go (1)
36-52: Non-deterministic host ordering.Map iteration order in Go is intentionally randomized. This means
hostListorder varies between runs, which affects host-to-worker assignment on line 118. This is fine for the work-stealing model but could make debugging/reproduction harder.Consider sorting
hostListif deterministic behavior is desired:♻️ Optional: Sort hosts for determinism
+import "sort" + func NewOrchestrator(tasks []TaskInfo, hosts map[string]config.Host, resolved *config.ResolvedConfig, cfg Config) *Orchestrator { hostList := make([]string, 0, len(hosts)) for name := range hosts { hostList = append(hostList, name) } + sort.Strings(hostList)internal/parallel/output.go (2)
169-173: Close() should clear buffers to release memory.The output buffers in
taskOutputcan hold significant data. Clear them on Close() to help the GC.Suggested fix
func (m *OutputManager) Close() { - // Nothing to clean up for now, but this method exists for - // future cleanup needs (e.g., flushing buffers, closing log files). + m.mu.Lock() + defer m.mu.Unlock() + // Clear buffers to release memory + for k := range m.taskOutput { + delete(m.taskOutput, k) + } }
241-244: UnusedisTerminal()function.This helper is defined but never called—
isTTYis passed intoNewOutputManagerinstead. Either use it internally or remove it.Option 1: Remove unused function
-// isTerminal checks if stdout is a terminal. -func isTerminal() bool { - return term.IsTerminal(int(os.Stdout.Fd())) -}Option 2: Export for callers
-// isTerminal checks if stdout is a terminal. -func isTerminal() bool { +// IsTerminal checks if stdout is a terminal. +func IsTerminal() bool { return term.IsTerminal(int(os.Stdout.Fd())) }internal/cli/parallel.go (3)
166-173: Multiple silently ignored errors in log writing.While non-critical, losing task logs without any indication could frustrate debugging. Consider at least logging to stderr on failure.
if logWriter != nil { for i := range result.TaskResults { - _ = logWriter.WriteTask(result.TaskResults[i].TaskName, result.TaskResults[i].Output) + if err := logWriter.WriteTask(result.TaskResults[i].TaskName, result.TaskResults[i].Output); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to write log for %s: %v\n", result.TaskResults[i].TaskName, err) + } } - _ = logWriter.WriteSummary(result, opts.TaskName) - _ = logWriter.Close() + if err := logWriter.WriteSummary(result, opts.TaskName); err != nil { + fmt.Fprintf(os.Stderr, "Warning: failed to write summary: %v\n", err) + } + _ = logWriter.Close() // Close errors are typically not actionable }
305-312: Use centralized defaults from config package instead of hardcoding.The fallback in
GetGlobalLogsConfig()currently hardcodesDir: "~/.rr/logs"andKeepRuns: 10, which matchesDefaultGlobalConfig().Logstoday. However, if config defaults change, this fallback could diverge. Access the centralized default instead:func GetGlobalLogsConfig() config.LogsConfig { global, err := config.LoadGlobal() if err != nil { return config.DefaultGlobalConfig().Logs } return global.Logs }
156-164: Wire up signal handling to enable graceful cancellation of in-flight tasks.The orchestrator already supports context cancellation, but passing
context.Background()means Ctrl+C won't actually cancel running tasks. Add signal handling to propagate cancellation:Suggested implementation
ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Handle Ctrl+C sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) go func() { <-sigCh cancel() }()This allows graceful shutdown when users press Ctrl+C, letting workers finish current tasks and exit cleanly.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
.rr.yamlinternal/cli/logs.gointernal/cli/parallel.gointernal/cli/task.gointernal/config/types.gointernal/config/validate.gointernal/config/validate_test.gointernal/parallel/logs/cleanup.gointernal/parallel/logs/cleanup_test.gointernal/parallel/logs/writer.gointernal/parallel/orchestrator.gointernal/parallel/orchestrator_test.gointernal/parallel/output.gointernal/parallel/output_test.gointernal/parallel/summary.gointernal/parallel/types.gointernal/parallel/worker.go
🧰 Additional context used
📓 Path-based instructions (3)
**/*.go
⚙️ CodeRabbit configuration file
**/*.go: - Check for idiomatic Go patterns and error handling
- Ensure errors are wrapped with context using the internal/errors package
- Verify proper use of testify assertions in tests
- Look for potential race conditions
- Check for proper resource cleanup (defer statements)
Files:
internal/parallel/output_test.gointernal/parallel/summary.gointernal/parallel/worker.gointernal/config/types.gointernal/parallel/logs/writer.gointernal/parallel/types.gointernal/parallel/orchestrator_test.gointernal/parallel/logs/cleanup.gointernal/config/validate_test.gointernal/parallel/output.gointernal/cli/parallel.gointernal/parallel/orchestrator.gointernal/config/validate.gointernal/cli/logs.gointernal/cli/task.gointernal/parallel/logs/cleanup_test.go
**/*_test.go
⚙️ CodeRabbit configuration file
**/*_test.go: - Prefer table-driven tests
- Use testify/assert and testify/require appropriately
- Ensure test names are descriptive
Files:
internal/parallel/output_test.gointernal/parallel/orchestrator_test.gointernal/config/validate_test.gointernal/parallel/logs/cleanup_test.go
internal/cli/**
⚙️ CodeRabbit configuration file
internal/cli/**: - Verify Cobra command patterns are followed consistently
- Check that commands have proper descriptions and examples
- Ensure flags are properly validated
Files:
internal/cli/parallel.gointernal/cli/logs.gointernal/cli/task.go
🧬 Code graph analysis (12)
internal/parallel/output_test.go (2)
internal/parallel/types.go (9)
OutputMode(6-6)OutputProgress(10-10)OutputQuiet(16-16)OutputStream(12-12)OutputVerbose(14-14)TaskStatus(73-73)TaskRunning(77-77)TaskPassed(78-78)TaskFailed(79-79)internal/parallel/output.go (1)
NewOutputManager(49-70)
internal/parallel/summary.go (5)
internal/parallel/types.go (2)
Result(42-48)TaskResult(56-65)internal/ui/colors.go (4)
ColorSuccess(37-37)ColorError(38-38)ColorMuted(47-47)ColorSecondary(46-46)internal/ui/symbols.go (3)
SymbolSuccess(5-5)SymbolFail(6-6)SymbolComplete(9-9)internal/errors/errors.go (1)
Error(26-31)internal/config/types.go (1)
Host(59-83)
internal/parallel/logs/writer.go (2)
internal/errors/errors.go (5)
Error(26-31)WrapWithCode(52-59)ErrConfig(11-11)New(34-40)ErrExec(15-15)internal/parallel/types.go (1)
Result(42-48)
internal/parallel/types.go (2)
internal/config/types.go (1)
Host(59-83)internal/errors/errors.go (1)
Error(26-31)
internal/parallel/orchestrator_test.go (3)
internal/parallel/types.go (9)
TaskInfo(99-104)Result(42-48)OutputProgress(10-10)OutputMode(6-6)TaskStatus(73-73)TaskPending(76-76)TaskRunning(77-77)TaskPassed(78-78)TaskFailed(79-79)internal/config/types.go (1)
Host(59-83)internal/parallel/orchestrator.go (2)
NewOrchestrator(37-52)Orchestrator(13-34)
internal/parallel/logs/cleanup.go (2)
internal/config/types.go (1)
LogsConfig(212-228)internal/errors/errors.go (3)
WrapWithCode(52-59)ErrConfig(11-11)ErrExec(15-15)
internal/config/validate_test.go (2)
internal/config/types.go (3)
TaskConfig(118-153)TaskStep(156-165)IsParallelTask(299-301)internal/config/validate.go (2)
ValidateParallelTasks(427-461)Validate(38-108)
internal/parallel/orchestrator.go (4)
internal/parallel/types.go (5)
TaskInfo(99-104)Config(20-27)TaskResult(56-65)Result(42-48)OutputMode(6-6)internal/config/types.go (1)
Host(59-83)internal/parallel/output.go (2)
OutputManager(19-36)NewOutputManager(49-70)internal/errors/errors.go (2)
New(34-40)ErrConfig(11-11)
internal/config/validate.go (2)
internal/errors/errors.go (3)
WrapWithCode(52-59)ErrConfig(11-11)Error(26-31)internal/config/types.go (1)
Config(45-56)
internal/cli/logs.go (2)
internal/cli/parallel.go (2)
GetGlobalLogsConfig(306-312)ExpandLogsDir(365-374)internal/parallel/logs/cleanup.go (4)
ListLogDirs(308-342)CleanAll(193-219)CleanByAge(123-146)Cleanup(34-79)
internal/cli/task.go (4)
internal/config/types.go (3)
IsParallelTask(299-301)TaskConfig(118-153)Host(59-83)internal/config/validate.go (1)
IsReservedTaskName(418-420)internal/cli/parallel.go (1)
ParallelTaskOptions(16-29)internal/errors/errors.go (1)
NewExitError(115-117)
internal/parallel/logs/cleanup_test.go (2)
internal/parallel/logs/cleanup.go (7)
CleanByRuns(83-120)CleanByAge(123-146)CleanBySize(149-190)Cleanup(34-79)CleanAll(193-219)ListLogDirs(308-342)LogDirInfo(345-351)internal/config/types.go (1)
LogsConfig(212-228)
🔇 Additional comments (73)
internal/config/types.go (1)
211-228: LGTM!The
LogsConfigstruct is well-documented with clear defaults. The three-tier retention policy (size, age, runs) provides flexible cleanup options.internal/parallel/logs/cleanup.go (6)
34-79: LGTM!The cleanup priority order (size → age → runs) is well-documented and the implementation correctly handles edge cases like empty config or non-existent directories.
81-120: LGTM!The per-task grouping and run-based cleanup logic is correct. Sorting by modification time and keeping the N most recent runs per task works as expected.
122-146: LGTM!The age-based cleanup is straightforward and correct.
148-190: LGTM!The size-based cleanup correctly prioritizes oldest directories for deletion and stops once under the limit.
271-304: LGTM!The task name extraction logic correctly handles task names containing hyphens by checking the timestamp format pattern. The fallback to return the full directory name is a safe default.
306-351: LGTM!The public
ListLogDirsfunction correctly transforms internal data to the publicLogDirInfotype and sorts by recency for display purposes.internal/parallel/orchestrator_test.go (7)
13-34: LGTM!The test correctly verifies orchestrator initialization. Consider using
require.NotNilfor the orchestrator check to fail fast if it's nil, preventing potential nil pointer panics in subsequent assertions.
36-70: LGTM!Good coverage of edge cases - empty tasks should succeed, while nil/empty hosts should error. The appropriate use of
require.NoError/require.Errorensures tests fail fast on unexpected error states.
72-139: Test validates worker calculation but not actual execution.This table-driven test correctly verifies the worker-limiting logic, but note that it replicates the calculation (lines 127-134) rather than calling
Run(). This tests the expected algorithm but not that the orchestrator actually spawns the correct number of workers.Consider adding an integration-style test that observes actual concurrent execution behavior if practical.
160-176: LGTM!The test correctly verifies the idempotent behavior of
markHostSynced- first call returns false, subsequent calls for the same host return true.
178-246: LGTM!Table-driven tests with good coverage of success semantics. The "empty result is success" behavior (line 235-238) is a reasonable design choice - no failures means success.
248-297: LGTM!Good coverage of default config values, status string mappings, and result aggregation. The test for unknown
TaskStatus(99)ensures robustness against unexpected enum values.
141-158: Test correctly validates graceful context cancellation handling.The test expectation is correct. The
Run()method is designed to treat context cancellation as a graceful shutdown—when the context is pre-cancelled, workers exit immediately without processing tasks, andRun()returnsnilerror (line 143 in orchestrator.go always returns success). The graceful behavior is already documented in the work-stealing queue design comment (orchestrator.go lines 54-64), so no additional comment is needed in the test itself.internal/config/validate.go (3)
102-106: LGTM!Parallel task reference validation is correctly placed after individual task validation, ensuring all tasks exist before checking cross-references.
270-307: LGTM!The mutual exclusivity checks are correct. Parallel tasks can't have
runorsteps, and the early return skips irrelevant step validation. Error messages clearly explain the conflict.
442-444: No action needed. ThegetTaskNamesfunction is defined ininternal/config/tasks.go(line 103) and is properly accessible fromvalidate.gosince both files are in the same package. The code will compile and run correctly.Likely an incorrect or invalid review comment.
internal/parallel/output_test.go (2)
12-81: LGTM! Well-structured table-driven test for OutputManager creation.Good coverage of the TTY fallback behavior where progress mode falls back to quiet for non-TTY. The test properly validates internal state initialization.
1-297: Test file follows guidelines and provides solid coverage.Tests are appropriately using table-driven patterns, testify assertions, and descriptive names. The coverage of OutputManager modes and state transitions is comprehensive.
internal/parallel/worker.go (2)
130-156: ensureSync has reasonable one-time sync logic.The sync-once-per-host pattern using
markHostSyncedis appropriate. Skipping sync for local connections makes sense.
230-241: Close properly cleans up connection with mutex protection.The method correctly handles the nil case and clears the connection reference after closing.
internal/parallel/summary.go (3)
65-70: Good practice: copying and sorting TaskResults for deterministic output.This correctly avoids mutating the input while ensuring consistent ordering.
160-174: Helpful UX: Retry suggestions for failed tasks.Providing actionable commands to retry individual failed tasks is a nice touch for developer experience.
179-193: FormatBriefSummary is clean and informative.Handles nil result gracefully and provides appropriate messaging for success vs partial failure cases.
internal/config/validate_test.go (4)
10-116: Comprehensive table-driven tests for ValidateParallelTasks.Good coverage of edge cases: nil config, empty tasks, valid configs, non-existent references, nested parallel detection, self-references, and step-based task compatibility. The error message assertions using
errContainsare well done.
118-165: Good mutual exclusivity tests for parallel with run/steps.Tests correctly verify that parallel cannot coexist with run or steps fields.
239-274: IsParallelTask tests cover key scenarios.The comment on line 245 about nil panic is helpful documentation. Consider adding a test that explicitly documents expected behavior if the team decides to handle nil defensively in the future.
167-237: Integration tests validate end-to-end config parsing.Good to see tests with Timeout and MaxParallel fields to ensure the full validation pipeline handles these correctly.
internal/cli/task.go (4)
245-252: Good addition: ListTasks now displays parallel task info.Clean integration showing task count for parallel tasks, maintaining consistent formatting with run and steps tasks.
322-376: createParallelTaskCommand follows Cobra patterns correctly.Flags are properly defined with clear descriptions. The command setup is consistent with the existing createTaskCommand pattern.
378-390: Exit code handling follows established pattern.Properly uses
errors.NewExitErrorto propagate non-zero exit codes, matching the pattern inrunTaskCommand. As per coding guidelines for CLI commands.
274-284: Task registration properly handles parallel task detection.The flow correctly retrieves the task value before passing to createTaskCommand, which then routes to the appropriate command builder.
internal/cli/logs.go (5)
14-59: LGTM!Command definitions follow Cobra patterns with clear descriptions and examples. Flag registration is correct.
61-133: LGTM!Clean implementation with good UX for empty state and helpful context about retention settings.
196-210: LGTM!Clean duration parsing with day suffix support and proper fallback to standard library.
212-236: LGTM!Clean human-readable age formatting with proper singular/plural handling.
238-256: LGTM!Straightforward size formatting with consistent decimal precision.
internal/parallel/logs/writer.go (5)
14-46: LGTM!Clean type definitions with appropriate JSON serialization tags.
48-77: LGTM!Proper error wrapping and sortable timestamp format for directory naming.
79-98: LGTM!Good defensive check for closed state and proper error wrapping.
162-176: LGTM!Simple accessors with clear semantics.
178-190: LGTM!Handles common filesystem-unsafe characters appropriately.
internal/parallel/logs/cleanup_test.go (6)
103-145: LGTM!Good coverage of age-based cleanup with proper time manipulation via
os.Chtimes.
147-213: LGTM!Good coverage of size-based cleanup scenarios with predictable test data.
215-264: LGTM!Good coverage of the orchestrated cleanup with priority ordering and edge cases.
266-307: LGTM!Clean tests for CleanAll and ListLogDirs with proper verification of sort order.
309-349: LGTM!Good table-driven tests with descriptive cases covering edge conditions.
366-403: LGTM!Good tests for tilde expansion (with proper HOME save/restore) and recursive size calculation.
internal/parallel/types.go (6)
5-17: LGTM!Clean string enum definition with clear documentation for each mode.
19-39: LGTM!Well-structured config with sensible defaults and clear inline documentation.
41-53: LGTM!Clean result aggregation structure. Note:
Success()returns true for empty results, which aligns with "no failures occurred" semantics.
55-70: LGTM!Comprehensive task result structure with correct success semantics checking both exit code and error.
72-96: LGTM!Idiomatic iota-based enum with proper String() implementation including unknown case handling.
98-104: LGTM!Clean task info structure with clear field documentation.
internal/parallel/orchestrator.go (7)
12-34: LGTM!Well-organized struct with clear separation of concerns and proper sync primitives for thread safety.
54-144: Well-designed work-stealing queue implementation.The channel-based work-stealing approach is clean and leverages Go's built-in channel synchronization. Good documentation explaining the design rationale.
146-199: LGTM!Clean worker implementation with proper cancellation checks and thread-safe fail-fast handling using sync.Once for cancel.
201-225: LGTM!Thread-safe result aggregation with correct mutex usage.
239-242: LGTM!Simple accessor. Note: returns nil until
Run()is called and initializes the output manager.
227-237:markHostSyncedis used correctly.Method is called in
worker.go:133for sync-once-per-host optimization, as expected. Implementation properly uses mutex locking with defer. No issues to address.
82-82: VerifyisTerminal()is defined and accessible.The function is called at line 82 but its definition cannot be verified due to repository access limitations. Confirm it is defined in the package (likely in output.go or elsewhere in internal/parallel/) and properly accessible from orchestrator.go.
internal/parallel/output.go (5)
1-15: LGTM on imports and package structure.Clean import organization with clear separation of standard library, external dependencies, and internal packages.
17-36: LGTM on OutputManager struct design.Good use of composition with mutex for thread safety. The separation of state tracking (status, hosts, output) from styling concerns is clean.
49-70: LGTM on constructor with sensible TTY fallback.The fallback from progress to quiet mode when stdout isn't a TTY is the right call—progress updates would just spam the pipe.
228-239: LGTM on formatDuration.Good handling of sub-second, seconds, and minute+ durations with appropriate precision.
246-263: LGTM on status accessors.Proper mutex usage and defensive copy in
GetAllStatusesprevents callers from mutating internal state.internal/cli/parallel.go (7)
1-13: LGTM on imports.Clean import structure following Go conventions.
15-29: LGTM on ParallelTaskOptions struct.Well-documented fields with clear purpose. Good that CLI flags map directly to these options.
189-218: LGTM on determineOutputMode.Clear precedence: CLI flags → task config → default. Switch statement handles all valid values.
240-252: LGTM on filterHostsByTag.Simple and correct. Creates a new map rather than mutating the input.
254-282: LGTM on renderDryRunPlan.Clear, useful output for dry-run mode. Good handling of local vs. remote execution display.
339-362: LGTM on FormatParallelTaskHelp.Good contextual help generation with subtask descriptions and config hints.
364-374: LGTM on ExpandLogsDir.Simple tilde expansion. Correctly returns original path on error.
- Add local execution fallback when no remote hosts configured - Add distributed lock acquisition before syncing to prevent conflicts - Add localWorker for executing tasks without SSH - Remove dead AddParallelFlags function - Fix ineffective break statement in context cancellation - Update tests to match new local execution behavior - Add parallel task documentation to skill file - Bump plugin version to 1.2.0 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix quick-check task description (vet+fmt -> vet+lint) - Add warning for invalid timeout parsing - Wrap multi-step commands in subshells to prevent shell injection - Calculate StartTime from earliest task result instead of approximation - Add 1MB buffer cap per task to prevent unbounded memory growth - Copy HostsUsed slice before sorting to avoid mutation - Return descriptive error when SSH connection is nil - Fix shell quoting using single quotes with proper escaping (critical) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Implements parallel task execution across multiple hosts as defined in #81.
.rr.yamlwithparallel: [task1, task2, task3]~/.rr/logs/with configurable retention (keep N runs, by age, or by size)rr <parallel-task>just like any other taskNew Features
Parallel Task Config:
CLI Flags:
--stream- real-time interleaved output--verbose- full output per task on completion--quiet- summary only--fail-fast- stop on first failure--max-parallel N- limit concurrency--no-logs- skip log file creation--dry-run- show execution plan without runningLog Management:
rr logs- list recent log directoriesrr logs clean- run retention-based cleanuprr logs clean --all- delete all logsrr logs clean --older 7d- delete by ageImplementation Details
internal/parallel/- New package with orchestrator, output manager, summary rendererinternal/parallel/logs/- Log writing and cleanupinternal/cli/parallel.go- CLI integration for parallel tasksinternal/cli/logs.go- Log management commandsinternal/config/- Extended TaskConfig with parallel fields, validationTest Plan
Closes #81
🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
rr logscommand for listing and cleaning run logs with retention policies and human-friendly display.Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.