Skip to content

feat: parallel task execution across multiple hosts#89

Merged
rileyhilliard merged 6 commits intomainfrom
feature/parallel-task-execution
Jan 13, 2026
Merged

feat: parallel task execution across multiple hosts#89
rileyhilliard merged 6 commits intomainfrom
feature/parallel-task-execution

Conversation

@rileyhilliard
Copy link
Copy Markdown
Owner

@rileyhilliard rileyhilliard commented Jan 13, 2026

Summary

Implements parallel task execution across multiple hosts as defined in #81.

  • Config-first design: Define parallel task groups in .rr.yaml with parallel: [task1, task2, task3]
  • Work-stealing queue: Tasks distributed across available hosts with natural load balancing
  • Multiple output modes: progress (default), stream, verbose, quiet - auto-detects TTY
  • Automatic log management: Logs saved to ~/.rr/logs/ with configurable retention (keep N runs, by age, or by size)
  • Full CLI integration: Run rr <parallel-task> just like any other task

New Features

Parallel Task Config:

tasks:
  test-all:
    description: Run all tests in parallel
    parallel:
      - test-cli
      - test-exec
      - test-host
    fail_fast: false
    timeout: 10m

CLI Flags:

  • --stream - real-time interleaved output
  • --verbose - full output per task on completion
  • --quiet - summary only
  • --fail-fast - stop on first failure
  • --max-parallel N - limit concurrency
  • --no-logs - skip log file creation
  • --dry-run - show execution plan without running

Log Management:

  • rr logs - list recent log directories
  • rr logs clean - run retention-based cleanup
  • rr logs clean --all - delete all logs
  • rr logs clean --older 7d - delete by age

Implementation Details

  • internal/parallel/ - New package with orchestrator, output manager, summary renderer
  • internal/parallel/logs/ - Log writing and cleanup
  • internal/cli/parallel.go - CLI integration for parallel tasks
  • internal/cli/logs.go - Log management commands
  • internal/config/ - Extended TaskConfig with parallel fields, validation

Test Plan

  • Unit tests for orchestrator, output manager, log cleanup
  • Config validation tests (mutual exclusivity, circular refs, nested parallel)
  • All existing tests pass
  • Lint clean (0 issues)
  • Manual testing with actual parallel tasks on multiple hosts

Closes #81

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Parallel task execution with fail-fast, max-workers, timeouts, output modes, and per-run log saving.
    • rr logs command for listing and cleaning run logs with retention policies and human-friendly display.
    • CLI flags for parallel control: --stream, --verbose, --quiet, --fail-fast, --max-parallel, --no-logs, --dry-run.
  • Documentation

    • Added "Parallel Tasks" section with examples and log details.
  • Tests

    • Extensive unit tests added for parallel execution and log cleanup.

✏️ Tip: You can customize this high-level summary in your review settings.

rileyhilliard and others added 4 commits January 13, 2026 14:55
Extends TaskConfig with fields for parallel task orchestration:
- Parallel: list of task names to run concurrently
- FailFast: stop on first failure
- MaxParallel: limit concurrent execution (0 = unlimited)
- Timeout: per-task timeout duration
- Output: task-specific output mode override

Adds LogsConfig struct for log retention settings:
- Dir: log directory (default ~/.rr/logs)
- KeepRuns: number of recent runs to keep (default 10)
- KeepDays: days to retain logs
- MaxSizeMB: max total log size

Also adds IsParallelTask() helper function and includes Logs
field in GlobalConfig with sensible defaults.

Fixes rangeValCopy lint warning in task.go triggered by the
increased struct size.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add the core parallel execution components:

- types.go: Core types for parallel execution (OutputMode, Config, Result,
  TaskResult, TaskStatus, TaskInfo)
- orchestrator.go: Coordinates parallel task execution across multiple hosts
  using work-stealing queue pattern with channel-based task distribution
- worker.go: Host-specific worker that handles connections, file sync, and
  command execution for individual tasks
- output.go: OutputManager with support for progress, stream, verbose, and
  quiet output modes with TTY detection
- summary.go: Renders formatted execution summaries with per-task results,
  aggregate stats, and actionable retry suggestions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add CLI integration for parallel task execution:
- RunParallelTask function orchestrates parallel task runs
- Parallel tasks auto-detected and use specialized command flags
  (--stream, --verbose, --quiet, --fail-fast, --max-parallel, --no-logs, --dry-run)
- Log storage with LogWriter for task outputs and summary.json
- Log cleanup based on retention policy (keep_runs, keep_days, max_size_mb)
- New `rr logs` command to view and clean log directories
- Parallel task validation (no circular refs, no nested parallel)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add comprehensive unit tests for the new parallel execution code:

- validate_test.go: Parallel task validation (mutual exclusivity,
  reference validation, nested parallel rejection)
- orchestrator_test.go: Work queue behavior, max-parallel limiting,
  fail-fast vs continue, empty task handling, context cancellation
- output_test.go: Output mode initialization, stream mode prefixing,
  TTY detection fallback
- cleanup_test.go: CleanByRuns, CleanByAge, CleanBySize, policy
  priority order (size > days > runs)

Add inline code comments explaining:
- Work-stealing queue design in orchestrator.go
- Output mode selection logic in output.go
- Log cleanup policy precedence in cleanup.go

Update .rr.yaml with parallel task examples (test-all, quick-check).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 13, 2026

Caution

Review failed

The pull request is closed.

Walkthrough

Implements config-driven parallel task execution: new orchestrator, workers, output manager, per-run log writer and retention/cleanup, CLI integration (parallel task flags, rr logs), and comprehensive validation and tests.

Changes

Cohort / File(s) Summary
Config types & validation
internal/config/types.go, internal/config/validate.go, internal/config/validate_test.go
Add LogsConfig and parallel fields on TaskConfig (Parallel, FailFast, MaxParallel, Timeout, Output), IsParallelTask, and ValidateParallelTasks with tests for non-circular, non-nested, and mutual-exclusion rules.
CLI: parallel task flow
internal/cli/parallel.go, internal/cli/task.go
Add ParallelTaskOptions, RunParallelTask, flag helpers, dry-run plan rendering, host filtering, output mode resolution, logging integration, and wiring to task command creation for parallel tasks.
CLI: logs management
internal/cli/logs.go, .rr.yaml
Add rr logs (list/clean) commands and helpers (duration parsing, formatting); add two parallel task examples in .rr.yaml.
Parallel core types
internal/parallel/types.go
Introduce OutputMode, Config, Result/TaskResult, TaskStatus, and TaskInfo public types and helpers (defaults, Success checks).
Orchestrator & workers
internal/parallel/orchestrator.go, internal/parallel/worker.go, internal/parallel/orchestrator_test.go
New Orchestrator with work-stealing queue, per-host workers (SSH/local), sync semantics, timeouts, fail-fast, cancellation, result aggregation, and tests for concurrency, fail-fast, cancellation, and worker behavior.
Output manager
internal/parallel/output.go, internal/parallel/output_test.go
Add OutputManager supporting progress/stream/verbose/quiet modes, per-task buffering/truncation, TTY fallback, and comprehensive mode tests.
Summary renderer
internal/parallel/summary.go
Add styled summary rendering API (RenderSummary, SummaryConfig, brief summary helper) for final run output.
Logging subsystem
internal/parallel/logs/writer.go, internal/parallel/logs/cleanup.go, internal/parallel/logs/cleanup_test.go
Add LogWriter (per-run dir, per-task logs, summary.json), cleanup utilities (CleanBySize, CleanByAge, CleanByRuns, CleanAll), ListLogDirs, LogDirInfo, and extensive tests covering policies and edge cases.
Misc / tests / metadata
internal/cli/logs.go (helpers), internal/config/... tests, .claude-plugin/*, plugins/rr/skills/rr/SKILL.md
CLI helpers and formatting functions; bumped plugin metadata versions; added documentation for parallel tasks (duplicated insertion noted).

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI / RunParallelTask
    participant Config as Config Loader
    participant Orch as Orchestrator
    participant Worker as Host Worker (N)
    participant SSH as SSH / Executor
    participant OutMgr as OutputManager
    participant LogWriter as LogWriter
    participant Summary as Summary Renderer

    CLI->>Config: Load & validate config
    Config-->>CLI: TaskConfig + resolved hosts
    CLI->>Orch: NewOrchestrator(tasks, hosts)
    CLI->>OutMgr: NewOutputManager(mode, isTTY)
    alt SaveLogs enabled
        CLI->>LogWriter: NewLogWriter(baseDir, taskName)
    end
    CLI->>Orch: Run(ctx)
    Orch->>Orch: Build task queue
    Orch->>Worker: Spawn workers (per host)
    par Parallel Task Execution
        loop Worker(s)
            Worker->>Orch: Pull task
            Worker->>SSH: ensureConnection
            SSH-->>Worker: SSH handle
            Worker->>SSH: ensureSync (once per host)
            Worker->>SSH: execCommand(task, timeout)
            SSH-->>Worker: stdout/stderr (streamed)
            Worker->>OutMgr: TaskOutput lines
            Worker->>Orch: Send TaskResult
        end
    end
    Orch->>Orch: Collect results (honor fail-fast)
    Orch->>Orch: Build final Result
    alt SaveLogs enabled
        Orch->>LogWriter: WriteTask(...)
        Orch->>LogWriter: WriteSummary(...)
    end
    Orch-->>CLI: Return Result
    CLI->>Summary: RenderSummary(result, logDir)
    Summary-->>CLI: Formatted output
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.88% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: implementing parallel task execution across multiple hosts, which is the primary objective of the PR.
Linked Issues check ✅ Passed The PR implements all major objectives from issue #81: config-first parallel groups, concurrency with work-stealing orchestrator, multiple output modes, log management with retention, exit code semantics, config validation, and CLI integration.
Out of Scope Changes check ✅ Passed All changes are directly tied to issue #81 objectives. The .rr.yaml additions, config extensions, orchestrator, output manager, logs subsystem, CLI integration, and validation are all in scope.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a08e41 and ef02875.

📒 Files selected for processing (11)
  • .claude-plugin/marketplace.json
  • .claude-plugin/plugin.json
  • .rr.yaml
  • internal/cli/parallel.go
  • internal/parallel/logs/writer.go
  • internal/parallel/orchestrator.go
  • internal/parallel/orchestrator_test.go
  • internal/parallel/output.go
  • internal/parallel/summary.go
  • internal/parallel/worker.go
  • plugins/rr/skills/rr/SKILL.md

Comment @coderabbitai help to get the list of available commands and usage tips.

@rileyhilliard
Copy link
Copy Markdown
Owner Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jan 13, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In @.rr.yaml:
- Around line 119-124: Update the quick-check task description to accurately
reflect the subtasks it runs: change the description string from mentioning "vet
+ fmt check" to "vet + lint check" (referencing the quick-check task and its
parallel entries 'vet' and 'lint') so the human-readable description matches the
actual parallel tasks.

In @internal/cli/parallel.go:
- Around line 127-132: The current block silently ignores parse errors for
task.Timeout (when opts.Timeout == 0) by calling time.ParseDuration and only
using the duration on success; change it to log or warn when parsing fails so
configuration mistakes aren't hidden: in the same conditional around
task.Timeout and opts.Timeout, capture the error from
time.ParseDuration(task.Timeout) and call the project logger or a warning helper
(e.g., use the existing logger used in this package) to emit a clear message
including the invalid value (task.Timeout) and the parse error, while still
falling back to the default behavior if parsing fails; update references to
task.Timeout, opts.Timeout, parallelCfg.Timeout and the time.ParseDuration call
accordingly.
- Around line 284-290: Remove the unused dead function AddParallelFlags (which
takes *ParallelTaskOptions and returns the flag binder) from
internal/cli/parallel.go; delete its definition and any related unused type
references if they become orphaned (e.g., ParallelTaskOptions) and ensure
parallel flags remain configured via createParallelTaskCommand in task.go as
noted in the review comment.

In @internal/parallel/logs/writer.go:
- Around line 112-123: The StartTime in the summary is computed at write time
(time.Now().Add(-result.Duration)) which is inaccurate; update the code to use
the actual run start time by first adding a StartTime field to the Result struct
(set when the run begins) and use result.StartTime when populating
SummaryJSON.StartTime, and as a fallback compute the earliest StartTime from
result.TaskResults (iterate TaskResults and pick the minimum StartTime) if
result.StartTime is zero-valued; ensure SummaryJSON, the Result type, and any
creators of Result are updated to set that StartTime.

In @internal/parallel/output.go:
- Around line 104-123: TaskOutput in OutputManager buffers unbounded per-task
output (taskOutput) causing memory growth in verbose/progress modes; add a
per-task cap (e.g., const maxBufferSize) and only append to the buffer when
buf.Len() < maxBufferSize, optionally set a truncated flag or counter in the
OutputManager (e.g., taskTruncated or taskTruncationCount) so callers can
indicate output was truncated, and ensure this logic is applied inside
TaskOutput for modes OutputVerbose/OutputProgress while keeping streaming
behavior for OutputStream.

In @internal/parallel/summary.go:
- Around line 141-148: The code currently calls sort.Strings(result.HostsUsed)
which mutates the Result in place; instead, make a copy of result.HostsUsed
(e.g., allocate a new []string and copy the values), sort the copy with
sort.Strings, and pass the sorted copy to strings.Join (while keeping
mutedStyle.Render and the existing fmt.Fprintf call unchanged) so rendering does
not modify result.HostsUsed.

In @internal/parallel/worker.go:
- Around line 176-182: The function currently hides connection failures by
returning (1, nil) when w.conn or w.conn.Client is nil; update the branch in the
method that checks w.conn and w.conn.Client to return a non-nil error describing
the failed SSH connection (including identifying info from the worker such as
w.Host or w.addr if available) instead of nil, so callers receive an explanatory
error rather than just exit code 1; keep the normal path that calls
w.conn.Client.ExecStream(fullCmd, stdout, stderr) unchanged.
- Around line 217-221: The shellQuote function currently returns "\"" + s + "\""
which does not escape dangerous characters and allows command injection; update
shellQuote to wrap the value in single quotes and escape any existing single
quotes in s by replacing each ' with the sequence '\'' (i.e. close single quote,
insert escaped single quote, reopen single quote) so the resulting string is
safe for the shell, and add the "strings" import used for the replace operation;
keep the function name shellQuote and ensure all callers continue to receive the
safely quoted string.
🧹 Nitpick comments (18)
internal/config/types.go (1)

298-301: Consider nil-safety for the helper.

If task is nil, this will panic. Depending on how it's called, a nil check might be prudent:

 func IsParallelTask(task *TaskConfig) bool {
+	if task == nil {
+		return false
+	}
 	return len(task.Parallel) > 0
 }
internal/parallel/logs/cleanup.go (1)

40-49: Consider extracting tilde expansion to a helper.

The ~ expansion logic is duplicated three times (in Cleanup, CleanAll, and ListLogDirs). A small helper would reduce duplication:

func expandTilde(path string) (string, error) {
	if len(path) > 0 && path[0] == '~' {
		home, err := os.UserHomeDir()
		if err != nil {
			return "", errors.WrapWithCode(err, errors.ErrConfig,
				"Can't determine home directory",
				"Check your environment configuration.")
		}
		return filepath.Join(home, path[1:]), nil
	}
	return path, nil
}

Also applies to: 194-203, 309-318

internal/config/validate.go (1)

422-461: The comment mentions cycle detection but only self-reference is checked.

The function comment says "No circular references (task A -> task B -> task A)" but the implementation only checks direct self-reference (line 453-456). The nested-parallel check (line 447-451) prevents A → parallel-B, but doesn't catch indirect cycles like A → B → C → A if B and C were simple tasks (which isn't possible currently).

Since nested parallel is disallowed, indirect cycles through parallel tasks are impossible. However, if this constraint were ever relaxed, you'd need full cycle detection. Consider clarifying the comment:

 // ValidateParallelTasks validates parallel task references after individual tasks are validated.
 // This checks:
 // - All referenced tasks exist
-// - No circular references (task A -> task B -> task A)
 // - No nested parallel (parallel task can't reference another parallel task)
+// - No self-reference (indirect cycles are prevented by the no-nesting rule)
internal/parallel/output_test.go (2)

165-180: Potential race condition when accessing internal state.

The test accesses mgr.taskOutput map while holding the lock, which is correct. However, calling mgr.TaskOutput() before the lock acquisition could race with the internal map access. This is likely fine in a single-threaded test, but consider using t.Parallel() carefully if you ever parallelize these tests.


240-261: Good boundary testing for duration formatting.

The test cases cover sub-second, second, and minute boundaries well. Consider adding a test case for zero duration (0 * time.Millisecond) to verify edge behavior.

internal/parallel/worker.go (2)

107-128: Context parameter is ignored in ensureConnection.

The context is accepted but not used. If the caller cancels the context during a slow SSH connection attempt, the connection will still proceed. Consider threading the context through to the selector or adding a cancellation check.


184-215: buildFullCommand assembles commands correctly but could use strings.Builder.

The logic is sound for constructing the compound shell command. Minor efficiency improvement: use strings.Builder instead of string concatenation in a loop.

internal/cli/task.go (2)

339-353: Redundant TaskName in runParallelTaskCommand call.

name is passed as the first parameter to runParallelTaskCommand but it's unused (parameter is _). The name is also set in opts.TaskName. Consider removing the unused parameter.

Proposed cleanup
-func runParallelTaskCommand(_ string, opts ParallelTaskOptions) error {
+func runParallelTaskCommand(opts ParallelTaskOptions) error {

And update the call site:

-			return runParallelTaskCommand(name, ParallelTaskOptions{
+			return runParallelTaskCommand(ParallelTaskOptions{

423-453: Consider letting Cobra auto-generate flag documentation.

The manual flag documentation in the long description duplicates what Cobra's --help would show. This creates a maintenance burden if flags change. You could simplify by removing lines 443-450 and letting users run rr <task> --help for flag details.

internal/cli/logs.go (1)

176-184: Errors silently ignored when counting deleted directories.

If ListLogDirs fails here, the deleted count shown to the user will be inaccurate. While not critical, logging a warning would help debug issues.

♻️ Suggested improvement
-	beforeDirs, _ := logs.ListLogDirs(baseDir)
+	beforeDirs, err := logs.ListLogDirs(baseDir)
+	if err != nil {
+		// Continue with cleanup, but we won't be able to report count
+		fmt.Println("Running cleanup based on retention settings...")
+		return logs.Cleanup(logsCfg)
+	}
 	beforeCount := len(beforeDirs)
 
 	if err := logs.Cleanup(logsCfg); err != nil {
 		return err
 	}
 
-	afterDirs, _ := logs.ListLogDirs(baseDir)
+	afterDirs, err := logs.ListLogDirs(baseDir)
+	if err != nil {
+		fmt.Println("Cleanup completed.")
+		return nil
+	}
 	afterCount := len(afterDirs)
internal/parallel/logs/cleanup_test.go (2)

76-82: Potential panic on short directory names.

e.Name()[:6] will panic if a directory name is shorter than 6 characters. This is a test file, so it's low risk, but could cause confusing failures if test data changes.

♻️ Suggested fix
 	for _, e := range entries {
-		if e.Name()[:6] == "task-a" {
+		if len(e.Name()) >= 6 && e.Name()[:6] == "task-a" {
 			taskACount++
-		} else if e.Name()[:6] == "task-b" {
+		} else if len(e.Name()) >= 6 && e.Name()[:6] == "task-b" {
 			taskBCount++
 		}
 	}

Or use strings.HasPrefix:

if strings.HasPrefix(e.Name(), "task-a") {
    taskACount++
} else if strings.HasPrefix(e.Name(), "task-b") {
    taskBCount++
}

351-364: Consider removing trivial struct test.

This test only verifies struct field assignment, which is guaranteed by the compiler. It doesn't test any behavior.

internal/parallel/orchestrator.go (1)

36-52: Non-deterministic host ordering.

Map iteration order in Go is intentionally randomized. This means hostList order varies between runs, which affects host-to-worker assignment on line 118. This is fine for the work-stealing model but could make debugging/reproduction harder.

Consider sorting hostList if deterministic behavior is desired:

♻️ Optional: Sort hosts for determinism
+import "sort"
+
 func NewOrchestrator(tasks []TaskInfo, hosts map[string]config.Host, resolved *config.ResolvedConfig, cfg Config) *Orchestrator {
 	hostList := make([]string, 0, len(hosts))
 	for name := range hosts {
 		hostList = append(hostList, name)
 	}
+	sort.Strings(hostList)
internal/parallel/output.go (2)

169-173: Close() should clear buffers to release memory.

The output buffers in taskOutput can hold significant data. Clear them on Close() to help the GC.

Suggested fix
 func (m *OutputManager) Close() {
-	// Nothing to clean up for now, but this method exists for
-	// future cleanup needs (e.g., flushing buffers, closing log files).
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	// Clear buffers to release memory
+	for k := range m.taskOutput {
+		delete(m.taskOutput, k)
+	}
 }

241-244: Unused isTerminal() function.

This helper is defined but never called—isTTY is passed into NewOutputManager instead. Either use it internally or remove it.

Option 1: Remove unused function
-// isTerminal checks if stdout is a terminal.
-func isTerminal() bool {
-	return term.IsTerminal(int(os.Stdout.Fd()))
-}
Option 2: Export for callers
-// isTerminal checks if stdout is a terminal.
-func isTerminal() bool {
+// IsTerminal checks if stdout is a terminal.
+func IsTerminal() bool {
 	return term.IsTerminal(int(os.Stdout.Fd()))
 }
internal/cli/parallel.go (3)

166-173: Multiple silently ignored errors in log writing.

While non-critical, losing task logs without any indication could frustrate debugging. Consider at least logging to stderr on failure.

 	if logWriter != nil {
 		for i := range result.TaskResults {
-			_ = logWriter.WriteTask(result.TaskResults[i].TaskName, result.TaskResults[i].Output)
+			if err := logWriter.WriteTask(result.TaskResults[i].TaskName, result.TaskResults[i].Output); err != nil {
+				fmt.Fprintf(os.Stderr, "Warning: failed to write log for %s: %v\n", result.TaskResults[i].TaskName, err)
+			}
 		}
-		_ = logWriter.WriteSummary(result, opts.TaskName)
-		_ = logWriter.Close()
+		if err := logWriter.WriteSummary(result, opts.TaskName); err != nil {
+			fmt.Fprintf(os.Stderr, "Warning: failed to write summary: %v\n", err)
+		}
+		_ = logWriter.Close() // Close errors are typically not actionable
 	}

305-312: Use centralized defaults from config package instead of hardcoding.

The fallback in GetGlobalLogsConfig() currently hardcodes Dir: "~/.rr/logs" and KeepRuns: 10, which matches DefaultGlobalConfig().Logs today. However, if config defaults change, this fallback could diverge. Access the centralized default instead:

func GetGlobalLogsConfig() config.LogsConfig {
	global, err := config.LoadGlobal()
	if err != nil {
		return config.DefaultGlobalConfig().Logs
	}
	return global.Logs
}

156-164: Wire up signal handling to enable graceful cancellation of in-flight tasks.

The orchestrator already supports context cancellation, but passing context.Background() means Ctrl+C won't actually cancel running tasks. Add signal handling to propagate cancellation:

Suggested implementation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Handle Ctrl+C
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
    <-sigCh
    cancel()
}()

This allows graceful shutdown when users press Ctrl+C, letting workers finish current tasks and exit cleanly.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fceb044 and 7a08e41.

📒 Files selected for processing (17)
  • .rr.yaml
  • internal/cli/logs.go
  • internal/cli/parallel.go
  • internal/cli/task.go
  • internal/config/types.go
  • internal/config/validate.go
  • internal/config/validate_test.go
  • internal/parallel/logs/cleanup.go
  • internal/parallel/logs/cleanup_test.go
  • internal/parallel/logs/writer.go
  • internal/parallel/orchestrator.go
  • internal/parallel/orchestrator_test.go
  • internal/parallel/output.go
  • internal/parallel/output_test.go
  • internal/parallel/summary.go
  • internal/parallel/types.go
  • internal/parallel/worker.go
🧰 Additional context used
📓 Path-based instructions (3)
**/*.go

⚙️ CodeRabbit configuration file

**/*.go: - Check for idiomatic Go patterns and error handling

  • Ensure errors are wrapped with context using the internal/errors package
  • Verify proper use of testify assertions in tests
  • Look for potential race conditions
  • Check for proper resource cleanup (defer statements)

Files:

  • internal/parallel/output_test.go
  • internal/parallel/summary.go
  • internal/parallel/worker.go
  • internal/config/types.go
  • internal/parallel/logs/writer.go
  • internal/parallel/types.go
  • internal/parallel/orchestrator_test.go
  • internal/parallel/logs/cleanup.go
  • internal/config/validate_test.go
  • internal/parallel/output.go
  • internal/cli/parallel.go
  • internal/parallel/orchestrator.go
  • internal/config/validate.go
  • internal/cli/logs.go
  • internal/cli/task.go
  • internal/parallel/logs/cleanup_test.go
**/*_test.go

⚙️ CodeRabbit configuration file

**/*_test.go: - Prefer table-driven tests

  • Use testify/assert and testify/require appropriately
  • Ensure test names are descriptive

Files:

  • internal/parallel/output_test.go
  • internal/parallel/orchestrator_test.go
  • internal/config/validate_test.go
  • internal/parallel/logs/cleanup_test.go
internal/cli/**

⚙️ CodeRabbit configuration file

internal/cli/**: - Verify Cobra command patterns are followed consistently

  • Check that commands have proper descriptions and examples
  • Ensure flags are properly validated

Files:

  • internal/cli/parallel.go
  • internal/cli/logs.go
  • internal/cli/task.go
🧬 Code graph analysis (12)
internal/parallel/output_test.go (2)
internal/parallel/types.go (9)
  • OutputMode (6-6)
  • OutputProgress (10-10)
  • OutputQuiet (16-16)
  • OutputStream (12-12)
  • OutputVerbose (14-14)
  • TaskStatus (73-73)
  • TaskRunning (77-77)
  • TaskPassed (78-78)
  • TaskFailed (79-79)
internal/parallel/output.go (1)
  • NewOutputManager (49-70)
internal/parallel/summary.go (5)
internal/parallel/types.go (2)
  • Result (42-48)
  • TaskResult (56-65)
internal/ui/colors.go (4)
  • ColorSuccess (37-37)
  • ColorError (38-38)
  • ColorMuted (47-47)
  • ColorSecondary (46-46)
internal/ui/symbols.go (3)
  • SymbolSuccess (5-5)
  • SymbolFail (6-6)
  • SymbolComplete (9-9)
internal/errors/errors.go (1)
  • Error (26-31)
internal/config/types.go (1)
  • Host (59-83)
internal/parallel/logs/writer.go (2)
internal/errors/errors.go (5)
  • Error (26-31)
  • WrapWithCode (52-59)
  • ErrConfig (11-11)
  • New (34-40)
  • ErrExec (15-15)
internal/parallel/types.go (1)
  • Result (42-48)
internal/parallel/types.go (2)
internal/config/types.go (1)
  • Host (59-83)
internal/errors/errors.go (1)
  • Error (26-31)
internal/parallel/orchestrator_test.go (3)
internal/parallel/types.go (9)
  • TaskInfo (99-104)
  • Result (42-48)
  • OutputProgress (10-10)
  • OutputMode (6-6)
  • TaskStatus (73-73)
  • TaskPending (76-76)
  • TaskRunning (77-77)
  • TaskPassed (78-78)
  • TaskFailed (79-79)
internal/config/types.go (1)
  • Host (59-83)
internal/parallel/orchestrator.go (2)
  • NewOrchestrator (37-52)
  • Orchestrator (13-34)
internal/parallel/logs/cleanup.go (2)
internal/config/types.go (1)
  • LogsConfig (212-228)
internal/errors/errors.go (3)
  • WrapWithCode (52-59)
  • ErrConfig (11-11)
  • ErrExec (15-15)
internal/config/validate_test.go (2)
internal/config/types.go (3)
  • TaskConfig (118-153)
  • TaskStep (156-165)
  • IsParallelTask (299-301)
internal/config/validate.go (2)
  • ValidateParallelTasks (427-461)
  • Validate (38-108)
internal/parallel/orchestrator.go (4)
internal/parallel/types.go (5)
  • TaskInfo (99-104)
  • Config (20-27)
  • TaskResult (56-65)
  • Result (42-48)
  • OutputMode (6-6)
internal/config/types.go (1)
  • Host (59-83)
internal/parallel/output.go (2)
  • OutputManager (19-36)
  • NewOutputManager (49-70)
internal/errors/errors.go (2)
  • New (34-40)
  • ErrConfig (11-11)
internal/config/validate.go (2)
internal/errors/errors.go (3)
  • WrapWithCode (52-59)
  • ErrConfig (11-11)
  • Error (26-31)
internal/config/types.go (1)
  • Config (45-56)
internal/cli/logs.go (2)
internal/cli/parallel.go (2)
  • GetGlobalLogsConfig (306-312)
  • ExpandLogsDir (365-374)
internal/parallel/logs/cleanup.go (4)
  • ListLogDirs (308-342)
  • CleanAll (193-219)
  • CleanByAge (123-146)
  • Cleanup (34-79)
internal/cli/task.go (4)
internal/config/types.go (3)
  • IsParallelTask (299-301)
  • TaskConfig (118-153)
  • Host (59-83)
internal/config/validate.go (1)
  • IsReservedTaskName (418-420)
internal/cli/parallel.go (1)
  • ParallelTaskOptions (16-29)
internal/errors/errors.go (1)
  • NewExitError (115-117)
internal/parallel/logs/cleanup_test.go (2)
internal/parallel/logs/cleanup.go (7)
  • CleanByRuns (83-120)
  • CleanByAge (123-146)
  • CleanBySize (149-190)
  • Cleanup (34-79)
  • CleanAll (193-219)
  • ListLogDirs (308-342)
  • LogDirInfo (345-351)
internal/config/types.go (1)
  • LogsConfig (212-228)
🔇 Additional comments (73)
internal/config/types.go (1)

211-228: LGTM!

The LogsConfig struct is well-documented with clear defaults. The three-tier retention policy (size, age, runs) provides flexible cleanup options.

internal/parallel/logs/cleanup.go (6)

34-79: LGTM!

The cleanup priority order (size → age → runs) is well-documented and the implementation correctly handles edge cases like empty config or non-existent directories.


81-120: LGTM!

The per-task grouping and run-based cleanup logic is correct. Sorting by modification time and keeping the N most recent runs per task works as expected.


122-146: LGTM!

The age-based cleanup is straightforward and correct.


148-190: LGTM!

The size-based cleanup correctly prioritizes oldest directories for deletion and stops once under the limit.


271-304: LGTM!

The task name extraction logic correctly handles task names containing hyphens by checking the timestamp format pattern. The fallback to return the full directory name is a safe default.


306-351: LGTM!

The public ListLogDirs function correctly transforms internal data to the public LogDirInfo type and sorts by recency for display purposes.

internal/parallel/orchestrator_test.go (7)

13-34: LGTM!

The test correctly verifies orchestrator initialization. Consider using require.NotNil for the orchestrator check to fail fast if it's nil, preventing potential nil pointer panics in subsequent assertions.


36-70: LGTM!

Good coverage of edge cases - empty tasks should succeed, while nil/empty hosts should error. The appropriate use of require.NoError/require.Error ensures tests fail fast on unexpected error states.


72-139: Test validates worker calculation but not actual execution.

This table-driven test correctly verifies the worker-limiting logic, but note that it replicates the calculation (lines 127-134) rather than calling Run(). This tests the expected algorithm but not that the orchestrator actually spawns the correct number of workers.

Consider adding an integration-style test that observes actual concurrent execution behavior if practical.


160-176: LGTM!

The test correctly verifies the idempotent behavior of markHostSynced - first call returns false, subsequent calls for the same host return true.


178-246: LGTM!

Table-driven tests with good coverage of success semantics. The "empty result is success" behavior (line 235-238) is a reasonable design choice - no failures means success.


248-297: LGTM!

Good coverage of default config values, status string mappings, and result aggregation. The test for unknown TaskStatus(99) ensures robustness against unexpected enum values.


141-158: Test correctly validates graceful context cancellation handling.

The test expectation is correct. The Run() method is designed to treat context cancellation as a graceful shutdown—when the context is pre-cancelled, workers exit immediately without processing tasks, and Run() returns nil error (line 143 in orchestrator.go always returns success). The graceful behavior is already documented in the work-stealing queue design comment (orchestrator.go lines 54-64), so no additional comment is needed in the test itself.

internal/config/validate.go (3)

102-106: LGTM!

Parallel task reference validation is correctly placed after individual task validation, ensuring all tasks exist before checking cross-references.


270-307: LGTM!

The mutual exclusivity checks are correct. Parallel tasks can't have run or steps, and the early return skips irrelevant step validation. Error messages clearly explain the conflict.


442-444: No action needed. The getTaskNames function is defined in internal/config/tasks.go (line 103) and is properly accessible from validate.go since both files are in the same package. The code will compile and run correctly.

Likely an incorrect or invalid review comment.

internal/parallel/output_test.go (2)

12-81: LGTM! Well-structured table-driven test for OutputManager creation.

Good coverage of the TTY fallback behavior where progress mode falls back to quiet for non-TTY. The test properly validates internal state initialization.


1-297: Test file follows guidelines and provides solid coverage.

Tests are appropriately using table-driven patterns, testify assertions, and descriptive names. The coverage of OutputManager modes and state transitions is comprehensive.

internal/parallel/worker.go (2)

130-156: ensureSync has reasonable one-time sync logic.

The sync-once-per-host pattern using markHostSynced is appropriate. Skipping sync for local connections makes sense.


230-241: Close properly cleans up connection with mutex protection.

The method correctly handles the nil case and clears the connection reference after closing.

internal/parallel/summary.go (3)

65-70: Good practice: copying and sorting TaskResults for deterministic output.

This correctly avoids mutating the input while ensuring consistent ordering.


160-174: Helpful UX: Retry suggestions for failed tasks.

Providing actionable commands to retry individual failed tasks is a nice touch for developer experience.


179-193: FormatBriefSummary is clean and informative.

Handles nil result gracefully and provides appropriate messaging for success vs partial failure cases.

internal/config/validate_test.go (4)

10-116: Comprehensive table-driven tests for ValidateParallelTasks.

Good coverage of edge cases: nil config, empty tasks, valid configs, non-existent references, nested parallel detection, self-references, and step-based task compatibility. The error message assertions using errContains are well done.


118-165: Good mutual exclusivity tests for parallel with run/steps.

Tests correctly verify that parallel cannot coexist with run or steps fields.


239-274: IsParallelTask tests cover key scenarios.

The comment on line 245 about nil panic is helpful documentation. Consider adding a test that explicitly documents expected behavior if the team decides to handle nil defensively in the future.


167-237: Integration tests validate end-to-end config parsing.

Good to see tests with Timeout and MaxParallel fields to ensure the full validation pipeline handles these correctly.

internal/cli/task.go (4)

245-252: Good addition: ListTasks now displays parallel task info.

Clean integration showing task count for parallel tasks, maintaining consistent formatting with run and steps tasks.


322-376: createParallelTaskCommand follows Cobra patterns correctly.

Flags are properly defined with clear descriptions. The command setup is consistent with the existing createTaskCommand pattern.


378-390: Exit code handling follows established pattern.

Properly uses errors.NewExitError to propagate non-zero exit codes, matching the pattern in runTaskCommand. As per coding guidelines for CLI commands.


274-284: Task registration properly handles parallel task detection.

The flow correctly retrieves the task value before passing to createTaskCommand, which then routes to the appropriate command builder.

internal/cli/logs.go (5)

14-59: LGTM!

Command definitions follow Cobra patterns with clear descriptions and examples. Flag registration is correct.


61-133: LGTM!

Clean implementation with good UX for empty state and helpful context about retention settings.


196-210: LGTM!

Clean duration parsing with day suffix support and proper fallback to standard library.


212-236: LGTM!

Clean human-readable age formatting with proper singular/plural handling.


238-256: LGTM!

Straightforward size formatting with consistent decimal precision.

internal/parallel/logs/writer.go (5)

14-46: LGTM!

Clean type definitions with appropriate JSON serialization tags.


48-77: LGTM!

Proper error wrapping and sortable timestamp format for directory naming.


79-98: LGTM!

Good defensive check for closed state and proper error wrapping.


162-176: LGTM!

Simple accessors with clear semantics.


178-190: LGTM!

Handles common filesystem-unsafe characters appropriately.

internal/parallel/logs/cleanup_test.go (6)

103-145: LGTM!

Good coverage of age-based cleanup with proper time manipulation via os.Chtimes.


147-213: LGTM!

Good coverage of size-based cleanup scenarios with predictable test data.


215-264: LGTM!

Good coverage of the orchestrated cleanup with priority ordering and edge cases.


266-307: LGTM!

Clean tests for CleanAll and ListLogDirs with proper verification of sort order.


309-349: LGTM!

Good table-driven tests with descriptive cases covering edge conditions.


366-403: LGTM!

Good tests for tilde expansion (with proper HOME save/restore) and recursive size calculation.

internal/parallel/types.go (6)

5-17: LGTM!

Clean string enum definition with clear documentation for each mode.


19-39: LGTM!

Well-structured config with sensible defaults and clear inline documentation.


41-53: LGTM!

Clean result aggregation structure. Note: Success() returns true for empty results, which aligns with "no failures occurred" semantics.


55-70: LGTM!

Comprehensive task result structure with correct success semantics checking both exit code and error.


72-96: LGTM!

Idiomatic iota-based enum with proper String() implementation including unknown case handling.


98-104: LGTM!

Clean task info structure with clear field documentation.

internal/parallel/orchestrator.go (7)

12-34: LGTM!

Well-organized struct with clear separation of concerns and proper sync primitives for thread safety.


54-144: Well-designed work-stealing queue implementation.

The channel-based work-stealing approach is clean and leverages Go's built-in channel synchronization. Good documentation explaining the design rationale.


146-199: LGTM!

Clean worker implementation with proper cancellation checks and thread-safe fail-fast handling using sync.Once for cancel.


201-225: LGTM!

Thread-safe result aggregation with correct mutex usage.


239-242: LGTM!

Simple accessor. Note: returns nil until Run() is called and initializes the output manager.


227-237: markHostSynced is used correctly.

Method is called in worker.go:133 for sync-once-per-host optimization, as expected. Implementation properly uses mutex locking with defer. No issues to address.


82-82: Verify isTerminal() is defined and accessible.

The function is called at line 82 but its definition cannot be verified due to repository access limitations. Confirm it is defined in the package (likely in output.go or elsewhere in internal/parallel/) and properly accessible from orchestrator.go.

internal/parallel/output.go (5)

1-15: LGTM on imports and package structure.

Clean import organization with clear separation of standard library, external dependencies, and internal packages.


17-36: LGTM on OutputManager struct design.

Good use of composition with mutex for thread safety. The separation of state tracking (status, hosts, output) from styling concerns is clean.


49-70: LGTM on constructor with sensible TTY fallback.

The fallback from progress to quiet mode when stdout isn't a TTY is the right call—progress updates would just spam the pipe.


228-239: LGTM on formatDuration.

Good handling of sub-second, seconds, and minute+ durations with appropriate precision.


246-263: LGTM on status accessors.

Proper mutex usage and defensive copy in GetAllStatuses prevents callers from mutating internal state.

internal/cli/parallel.go (7)

1-13: LGTM on imports.

Clean import structure following Go conventions.


15-29: LGTM on ParallelTaskOptions struct.

Well-documented fields with clear purpose. Good that CLI flags map directly to these options.


189-218: LGTM on determineOutputMode.

Clear precedence: CLI flags → task config → default. Switch statement handles all valid values.


240-252: LGTM on filterHostsByTag.

Simple and correct. Creates a new map rather than mutating the input.


254-282: LGTM on renderDryRunPlan.

Clear, useful output for dry-run mode. Good handling of local vs. remote execution display.


339-362: LGTM on FormatParallelTaskHelp.

Good contextual help generation with subtask descriptions and config hints.


364-374: LGTM on ExpandLogsDir.

Simple tilde expansion. Correctly returns original path on error.

rileyhilliard and others added 2 commits January 13, 2026 16:06
- Add local execution fallback when no remote hosts configured
- Add distributed lock acquisition before syncing to prevent conflicts
- Add localWorker for executing tasks without SSH
- Remove dead AddParallelFlags function
- Fix ineffective break statement in context cancellation
- Update tests to match new local execution behavior
- Add parallel task documentation to skill file
- Bump plugin version to 1.2.0

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix quick-check task description (vet+fmt -> vet+lint)
- Add warning for invalid timeout parsing
- Wrap multi-step commands in subshells to prevent shell injection
- Calculate StartTime from earliest task result instead of approximation
- Add 1MB buffer cap per task to prevent unbounded memory growth
- Copy HostsUsed slice before sorting to avoid mutation
- Return descriptive error when SSH connection is nil
- Fix shell quoting using single quotes with proper escaping (critical)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parallel task execution across multiple hosts

1 participant