Skip to content

feat: shared-nothing worker#1564

Merged
yohamta0 merged 39 commits into
mainfrom
experimental-worker-sync
Jan 11, 2026
Merged

feat: shared-nothing worker#1564
yohamta0 merged 39 commits into
mainfrom
experimental-worker-sync

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jan 10, 2026

Copy link
Copy Markdown
Collaborator

Summary by CodeRabbit

  • New Features

    • Distributed execution: remote status pushing, per-step log streaming, pluggable log writer factory, queued-run support.
    • Static coordinator discovery via new CLI/env flag and static registry; remote task handler for workers and temp-DAG helper.
    • Coordinator RPCs to report status, stream logs, and query sub-run status.
  • Bug Fixes

    • Improved zombie detection, graceful shutdown, and cancellation handling via heartbeat directives; safer resource cleanup.
  • Tests

    • Extensive new and updated unit/integration tests for remote workflows, logs, status, retries, and cancellations.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Jan 10, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

Adds shared-nothing distributed execution: workers can statically discover coordinators, push DAG run status, and stream per-step logs via new gRPC RPCs; coordinator gains persistence, zombie detection, and log handling; runtime and worker code wire status/log pushers and remote task handlers; extensive proto, conversion, and tests added.

Changes

Cohort / File(s) Summary
Proto & RPC contract
proto/coordinator/v1/coordinator.proto
Added ReportStatus, StreamLogs, GetDAGRunStatus RPCs; DAGRunStatusProto, LogChunk, GetDAGRunStatus messages; added Task.previous_status and HeartbeatResponse.cancelled_runs.
Coordinator service core
internal/service/coordinator/handler.go, internal/service/coordinator/service.go, internal/service/coordinator/log_handler.go, internal/service/coordinator/log_handler_test.go
Handler gained DAGRunStore/logDir options, ReportStatus/GetDAGRunStatus/StreamLogs implementations, open-attempt caching, zombie detector, Start/Stop lifecycle changes, and filesystem-backed log handling plus tests.
Coordinator client & registry
internal/service/coordinator/client.go, internal/service/coordinator/client_test.go, internal/service/coordinator/static_registry.go, internal/service/coordinator/static_registry_test.go
Client API extended (Heartbeat now returns response; added ReportStatus, StreamLogs, GetDAGRunStatus); added StaticRegistry for static coordinator discovery and corresponding tests.
Worker remote execution
internal/service/worker/remote_handler.go, internal/service/worker/handler.go, internal/service/worker/worker.go, internal/service/worker/worker_test.go
New RemoteTaskHandler and config; workers support static coordinator addresses, remote status/log streaming, task-level cancel funcs and cancellation directives processing, and temp-DAG handling updates.
Runtime remote components
internal/runtime/remote/status_pusher.go, internal/runtime/remote/log_streamer.go, internal/proto/convert/status.go, internal/proto/convert/status_test.go
New StatusPusher and LogStreamer; protobuf↔internal status/node conversion utilities and tests for round-trip correctness.
Agent / execution plumbing
internal/runtime/agent/agent.go, internal/core/execution/context.go, internal/runtime/context.go
Agent options expose StatusPusher, LogWriterFactory, QueuedRun; LogWriterFactory interface added and propagated into execution Context; Dispatcher gained GetDAGRunStatus.
Executor / DAG run flows
internal/runtime/executor/dag_runner.go, internal/runtime/executor/task.go, internal/runtime/output.go, internal/runtime/executor/dag_runner_test.go
Sub-DAG status queries routed via coordinator, outputs extracted from node variables, CreateTempDAGFile used for composed DAGs, WithPreviousStatus task option added, remote vs local writer selection implemented.
File utilities & tests
internal/common/fileutil/fileutil.go, internal/common/fileutil/fileutil_test.go, internal/service/worker/handler_test.go
New CreateTempDAGFile helper (supports extra documents) and unit tests; worker test callsites updated.
Scheduler / queue changes
internal/service/scheduler/dag_executor.go, internal/service/scheduler/queue_processor.go
ExecuteDAG gained previousStatus parameter; queue monitoring startup reworked (backoff/heartbeat-based) and call sites updated.
Integration & test harness
internal/integration/remote_test.go, internal/integration/*, internal/test/*, many internal/service/coordinator/*_test.go
Large integration test suite for remote execution/status/logs added; test helpers/options extended to enable status/log persistence; many tests updated to use persistence option.
Command & config
internal/cmd/flags.go, internal/cmd/context.go, internal/cmd/coordinator.go, internal/cmd/startall.go, internal/cmd/worker.go, internal/common/config/*
Added --worker.coordinators flag and config.Worker.Coordinators; parseCoordinatorAddresses added; coordinator NewService/newCoordinator signatures and lifecycle updated; context treats "coordinator" as long-running.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler
    participant Worker
    participant RemoteHandler
    participant Coordinator
    participant DAGRunStore

    Scheduler->>Worker: Dispatch Task (may include Definition)
    Worker->>RemoteHandler: Handle(ctx, task)
    RemoteHandler->>RemoteHandler: loadDAG (Definition → temp file) / create handlers
    RemoteHandler->>Coordinator: ReportStatus(initial)
    RemoteHandler->>Coordinator: StreamLogs (Log chunks during steps)
    RemoteHandler->>DAGRunStore: Persist final status (if configured)
    RemoteHandler->>Coordinator: ReportStatus(final)
    RemoteHandler-->>Scheduler: Task completion
Loading
sequenceDiagram
    participant Worker
    participant Coordinator
    participant HandlerStore as DAGRunStore

    loop Heartbeat interval
        Worker->>Coordinator: Heartbeat(req)
        Coordinator->>HandlerStore: detect stale heartbeats / mark failures
        Coordinator-->>Worker: HeartbeatResponse{cancelled_runs}
        Worker->>Worker: processCancellations(cancelled_runs) -> cancel running tasks
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.51% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'feat: shared-nothing worker' accurately reflects the main objective—introducing shared-nothing worker execution capabilities with distributed coordination, remote status/log handling, and static coordinator discovery.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/runtime/executor/dag_runner_test.go (1)

18-72: Avoid potentially vacuous assertions after Cleanup (capture the path before cleanup).

If SubDAGExecutor.Cleanup clears executor.tempFile, assert.NoFileExists(t, executor.tempFile) can become a weak check (empty string). Capture the path first and (optionally) assert the field is cleared.

Proposed tweak
 	// Verify the temp file was created
 	assert.FileExists(t, executor.tempFile)
+	tempPath := executor.tempFile

 	// Read and verify the content
-	content, err := os.ReadFile(executor.tempFile)
+	content, err := os.ReadFile(tempPath)
 	require.NoError(t, err)
 	assert.Equal(t, parentDAG.LocalDAGs["local-child"].YamlData, content)

 	// Cleanup
 	err = executor.Cleanup(ctx)
 	assert.NoError(t, err)
-	assert.NoFileExists(t, executor.tempFile)
+	assert.NoFileExists(t, tempPath)
+	assert.Empty(t, executor.tempFile)
🤖 Fix all issues with AI agents
In @internal/common/fileutil/fileutil_test.go:
- Around line 229-296: The test misses verifying that YAML document separators
are on their own line, which hides a bug in CreateTempDAGFile where it appends
"---\n" directly after the primary doc; update CreateTempDAGFile to ensure it
writes a separator with a preceding newline (e.g., use "\n---\n" or ensure the
primary doc ends with '\n' before appending) and update the
TestCreateTempDAGFile cases to assert the separator appears as "\n---\n" (or
that the separator is on its own line) and add an assertion that the primary doc
terminates with a newline when extra docs are provided.

In @internal/runtime/executor/dag_runner.go:
- Around line 493-511: extractOutputsFromNodes currently drops any
OutputVariables values without an "=" silently; fix this by normalizing or
validating at the proto conversion boundary: update ProtoToNode to ensure
OutputVariables values are written using the same stringutil.NewKeyValue(...)
format (the same normalization used by setVariable/setBoolVariable in data.go)
so proto-roundtripped nodes always store "key=value", or alternatively add a
defensive normalization in extractOutputsFromNodes to handle values missing "="
(e.g., treat the whole string as key with empty value or log/warn and skip
explicitly). Reference: extractOutputsFromNodes, ProtoToNode, setVariable,
setBoolVariable, stringutil.NewKeyValue.

In @internal/service/coordinator/handler.go:
- Around line 232-256: Heartbeat currently calls detectAndCleanupZombies on
every invocation which causes O(workers²) work under high scale; remove the
direct call to detectAndCleanupZombies from the Heartbeat method and rely on the
existing periodic detector started by StartZombieDetector, or implement simple
throttling inside Heartbeat by tracking a timestamp field (e.g.,
h.lastZombieCleanupAt) and only invoking detectAndCleanupZombies when
time.Since(h.lastZombieCleanupAt) >= h.zombieDetectionInterval, updating
h.lastZombieCleanupAt under the mutex; update references to
detectAndCleanupZombies, Heartbeat, StartZombieDetector, and add the new
timestamp/interval fields on the Handler struct if choosing the throttling
approach.

In @internal/service/coordinator/service.go:
- Around line 120-122: The Stop() method on the coordinator currently calls
srv.handler.Close(ctx) but does not wait for the zombie detector goroutine to
finish; add a call to the coordinator's WaitZombieDetector() (or equivalent
method) before or after srv.handler.Close(ctx) so the zombie detector is awaited
during shutdown. Locate the Stop() method in service.go and ensure it invokes
WaitZombieDetector() (the same coordination used in startall.go) to block until
the detector exits, preserving existing Close(ctx) behavior and ordering
consistent with startall.go.
- Around line 60-62: Replace the hardcoded 45-second interval passed to
srv.handler.StartZombieDetector with the configured value from the scheduler
configuration (e.g., use the Scheduler.ZombieDetectionInterval field from the
coordinator/service configuration accessible on the service instance), ensuring
you convert it to time.Duration if necessary; update the call site in service.go
where srv.handler.StartZombieDetector(ctx, 45*time.Second) is invoked to use the
config value instead (for example: srv.cfg.Scheduler.ZombieDetectionInterval or
the equivalent config field on srv) so the Coordinator respects the same
configuration as the Scheduler.
🧹 Nitpick comments (17)
internal/runtime/runner_helper_test.go (1)

316-338: Consider applying the same testify pattern for consistency.

The nodeByName function at line 336 still uses t.Fatalf for the same not-found check. For consistency with the change on line 312, consider replacing it with require.NotNil.

♻️ Suggested consistency improvement
 	if rr.cfg.OnCancel != nil && rr.cfg.OnCancel.Name == stepName {
 		return rr.runner.HandlerNode(core.HandlerOnCancel)
 	}
 
-	t.Fatalf("step %s not found", stepName)
-	return nil
+	require.FailNow(t, "step %s not found", stepName)
+	return nil  // unreachable but required for compilation

Note: require.FailNow is used here since the function must return a value. Alternatively, restructure to eliminate the need for a return after the assertion.

internal/runtime/manager.go (1)

128-139: Consider logging socket stop failures for debugging.

The logic to guard socket communication behind the alive check is sound, and the fallback to the abort flag mechanism is a good approach for handling both local and distributed scenarios. However, when the socket request fails on line 134, the error is silently discarded. Logging this failure (at debug level) would help diagnose cases where socket stop is expected to work but doesn't.

📝 Suggested change
 	if alive {
 		addr := dag.SockAddr(dagRunID)
 		if fileutil.FileExists(addr) {
 			// In case the socket exists, we try to send a stop request
 			client := sock.NewClient(addr)
-			if _, err := client.Request("POST", "/stop"); err == nil {
+			if _, err := client.Request("POST", "/stop"); err != nil {
+				logger.Debug(ctx, "Socket stop failed, falling back to abort flag", tag.Error(err))
+			} else {
 				logger.Info(ctx, "Successfully stopped DAG via socket")
 				return nil
 			}
 		}
 	}
internal/runtime/executor/dag_runner_test.go (1)

269-299: Use t.TempDir() instead of a fixed os.TempDir() path (stronger parallel safety).

Even if it’s currently unique within the package, hardcoding {os.TempDir()}/dagu-test/test.yaml is easy to collide with in future additions.

Proposed tweak
-	// Create a temporary file
-	tempDir := filepath.Join(os.TempDir(), "dagu-test")
-	err := os.MkdirAll(tempDir, 0750)
-	require.NoError(t, err)
-	defer func() { _ = os.RemoveAll(tempDir) }()
-
-	tempFile := filepath.Join(tempDir, "test.yaml")
-	err = os.WriteFile(tempFile, []byte("test content"), 0600)
+	tempDir := t.TempDir()
+	tempFile := filepath.Join(tempDir, "test.yaml")
+	err := os.WriteFile(tempFile, []byte("test content"), 0600)
 	require.NoError(t, err)
internal/common/config/config.go (1)

278-284: Consider validating Worker.Coordinators format in Config.Validate() (fail fast).

Since this is user-provided via flags/env, validating entries (e.g., net.SplitHostPort + port range, and rejecting schemes if you truly require host:port) would make misconfigurations much easier to diagnose.

internal/service/coordinator/client_test.go (2)

24-31: Use net.SplitHostPort instead of strings.Split for host/port parsing (more robust).

This avoids subtle bugs if the listener address format changes (e.g., IPv6).

Proposed tweak
 import (
 	"context"
+	"net"
 	"net"
 	"strconv"
 	"strings"
@@
 func parsePort(addr string) int {
-	parts := strings.Split(addr, ":")
-	if len(parts) < 2 {
-		return 0
-	}
-	port, _ := strconv.Atoi(parts[1])
-	return port
+	_, portStr, err := net.SplitHostPort(addr)
+	if err != nil {
+		return 0
+	}
+	port, _ := strconv.Atoi(portStr)
+	return port
 }

And when building execution.HostInfo, prefer:

host, portStr, _ := net.SplitHostPort(addr)
port, _ := strconv.Atoi(portStr)

Also applies to: 141-143, 188-190


204-245: Heartbeat test: consider asserting the response is non-nil.

Right now you only validate the request shape; also checking the returned *HeartbeatResponse would make the new signature change more meaningful.

internal/common/fileutil/fileutil_test.go (1)

12-41: InvalidPath test can be platform-dependent; parallelism may surface flakiness.

Using "/nonexistent/directory/test.log" can behave unexpectedly across OSes/containers. Consider generating a guaranteed-invalid path from t.TempDir() (e.g., create+remove a dir, then attempt to create a file inside it).
Based on learnings, nice job keeping assertions on the failure path.

internal/common/config/loader.go (1)

613-647: Consider adding address format validation.

The function correctly handles multiple input formats (comma-separated string, YAML list, string slice) and properly trims whitespace. However, it doesn't validate that addresses are in valid host:port format. Invalid addresses would cause failures later during connection attempts.

Optional: Add basic host:port validation
 func parseCoordinatorAddresses(input interface{}) []string {
 	var addresses []string

+	validateAddress := func(addr string) bool {
+		// Basic validation: must contain exactly one colon
+		parts := strings.Split(addr, ":")
+		if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
+			return false
+		}
+		return true
+	}
+
 	switch v := input.(type) {
 	case string:
 		// Parse comma-separated string: "host1:port1,host2:port2"
 		if v != "" {
 			parts := strings.Split(v, ",")
 			for _, part := range parts {
 				part = strings.TrimSpace(part)
-				if part != "" {
+				if part != "" && validateAddress(part) {
 					addresses = append(addresses, part)
 				}
 			}
 		}
 	// ... similar changes for other cases
internal/service/coordinator/log_handler.go (2)

123-131: Consider including AttemptId in the stream key to prevent collisions during retries.

The current streamKey uses DagName/DagRunId/StepName/StreamType, but logFilePath also uses AttemptId for the directory structure. If a DAG run is retried with the same run ID but different attempt ID, the stream key would collide while the file paths would differ, potentially causing incorrect writer reuse.

♻️ Suggested fix
 func (h *logHandler) streamKey(chunk *coordinatorv1.LogChunk) string {
-	return fmt.Sprintf("%s/%s/%s/%s",
+	return fmt.Sprintf("%s/%s/%s/%s/%s",
 		chunk.DagName,
 		chunk.DagRunId,
+		chunk.AttemptId,
 		chunk.StepName,
 		chunk.StreamType.String(),
 	)
 }

171-182: Consider propagating context from the stream for logging.

Using context.Background() loses any trace context or log correlation that might be available from the stream handling context.

♻️ Suggested improvement
-func (h *logHandler) closeWriter(chunk *coordinatorv1.LogChunk) {
+func (h *logHandler) closeWriter(ctx context.Context, chunk *coordinatorv1.LogChunk) {
 	key := h.streamKey(chunk)
 
 	h.writersMu.Lock()
 	defer h.writersMu.Unlock()
 
 	if w, ok := h.writers[key]; ok {
-		w.close(context.Background())
+		w.close(ctx)
 		delete(h.writers, key)
 	}
 }

Then update the call site in handleStream:

 		if chunk.IsFinal {
-			h.closeWriter(chunk)
+			h.closeWriter(stream.Context(), chunk)
 			continue
 		}
internal/runtime/remote/log_streamer.go (2)

106-140: Consider handling stream initialization failure more gracefully.

If StreamLogs fails, the error is returned but subsequent Write calls will attempt to initialize the stream again since w.stream remains nil. This is actually reasonable behavior (retry on next write), but consider whether you want to track a "permanently failed" state to avoid repeated connection attempts for a broken stream.


160-178: Sequence number is missing on the final chunk.

The final chunk doesn't include a sequence number (defaults to 0). While IsFinal=true marks it as the last chunk, including the correct sequence number would help with ordering verification on the receiver side.

♻️ Suggested fix
 	// Send final marker
 	if w.stream != nil {
+		w.sequence++
 		finalChunk := &coordinatorv1.LogChunk{
 			WorkerId:       w.streamer.workerID,
 			DagRunId:       w.streamer.dagRunID,
 			DagName:        w.streamer.dagName,
 			StepName:       w.stepName,
 			StreamType:     toProtoStreamType(w.streamType),
 			IsFinal:        true,
+			Sequence:       w.sequence,
 			RootDagRunName: w.streamer.rootRef.Name,
 			RootDagRunId:   w.streamer.rootRef.ID,
 			AttemptId:      w.streamer.getAttemptID(),
 		}
internal/service/coordinator/static_registry.go (1)

22-49: Coordinator ID uses original index, not filtered index.

When empty addresses are skipped, the coord-<i> ID still uses the original loop index i. For example, if addresses are ["", "host1:50055", "host2:50055"], the IDs will be coord-1 and coord-2 (skipping coord-0). This may be intentional for debugging/correlation with the input array, but could be confusing. Consider using len(hosts) for sequential IDs.

♻️ Optional fix for sequential IDs
 	for i, addr := range addresses {
 		if addr == "" {
 			continue
 		}

 		host, port, err := parseAddress(addr)
 		if err != nil {
 			return nil, fmt.Errorf("invalid coordinator address %q: %w", addr, err)
 		}

 		hosts = append(hosts, execution.HostInfo{
-			ID:        fmt.Sprintf("coord-%d", i),
+			ID:        fmt.Sprintf("coord-%d", len(hosts)),
 			Host:      host,
 			Port:      port,
 			Status:    execution.ServiceStatusActive,
 			StartedAt: time.Now(),
 		})
 	}
internal/service/worker/remote_handler.go (1)

232-251: Temp log directory uses fixed path pattern.

The log directory is created at /tmp/dagu/worker-logs/<dagRunID>. If multiple workers on the same host execute the same DAG run (unlikely but possible in edge cases), they would conflict. Consider including worker ID in the path.

♻️ Optional: Include worker ID in temp path
 func (h *remoteTaskHandler) createAgentEnv(ctx context.Context, dagRunID string) (*agentEnv, error) {
-	logDir := filepath.Join(os.TempDir(), "dagu", "worker-logs", dagRunID)
+	logDir := filepath.Join(os.TempDir(), "dagu", "worker-logs", h.workerID, dagRunID)
 	if err := os.MkdirAll(logDir, 0750); err != nil {
 		return nil, fmt.Errorf("failed to create log directory: %w", err)
 	}
internal/service/coordinator/client.go (1)

505-531: Missing health check before establishing stream connection.

StreamLogs bypasses the health check that attemptCall performs before making calls. This could result in attempting to stream logs to an unhealthy coordinator, whereas other methods verify coordinator health first.

Consider adding a health check or documenting why it's intentionally skipped for streaming connections.

♻️ Suggested improvement
 func (cli *clientImpl) StreamLogs(ctx context.Context) (coordinatorv1.CoordinatorService_StreamLogsClient, error) {
 	members, err := cli.getCoordinatorMembers(ctx)
 	if err != nil {
 		return nil, err
 	}

 	// Try each coordinator until one works
 	var lastErr error
 	for _, member := range members {
 		client, err := cli.getOrCreateClient(member)
 		if err != nil {
 			lastErr = err
 			continue
 		}

+		// Check coordinator health before establishing stream
+		if err := cli.isHealthy(ctx, member); err != nil {
+			lastErr = err
+			continue
+		}
+
 		stream, err := client.client.StreamLogs(ctx)
 		if err != nil {
 			lastErr = err
 			continue
 		}

 		return stream, nil
 	}

 	return nil, fmt.Errorf("failed to create log stream: %w", lastErr)
 }
internal/service/coordinator/handler.go (2)

87-97: Consider logging errors during Close instead of silently ignoring them.

While using _ = attempt.Close(ctx) avoids blocking shutdown, logging close errors would help diagnose issues with status persistence during shutdown.

♻️ Suggested improvement
 func (h *Handler) Close(ctx context.Context) {
 	h.attemptsMu.Lock()
 	defer h.attemptsMu.Unlock()

 	for dagRunID, attempt := range h.openAttempts {
-		_ = attempt.Close(ctx)
+		if err := attempt.Close(ctx); err != nil {
+			logger.Warn(ctx, "Failed to close attempt during shutdown",
+				tag.RunID(dagRunID), tag.Error(err))
+		}
 		delete(h.openAttempts, dagRunID)
 	}
 }

259-285: Potential performance issue with per-task database lookups.

getCancelledRunsForWorker calls FindAttempt and IsAborting for each running task on every heartbeat. With many running tasks across workers, this could cause significant database load.

Consider caching abort status or batching these lookups if performance becomes an issue.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c727c59 and a31b5b5.

⛔ Files ignored due to path filters (2)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (44)
  • internal/cmd/context.go
  • internal/cmd/coordinator.go
  • internal/cmd/flags.go
  • internal/cmd/startall.go
  • internal/cmd/worker.go
  • internal/common/config/config.go
  • internal/common/config/definition.go
  • internal/common/config/loader.go
  • internal/common/fileutil/fileutil.go
  • internal/common/fileutil/fileutil_test.go
  • internal/core/execution/context.go
  • internal/integration/distributed_helpers_test.go
  • internal/integration/remote_test.go
  • internal/proto/convert/status.go
  • internal/proto/convert/status_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/context.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/runtime/executor/task.go
  • internal/runtime/manager.go
  • internal/runtime/output.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/remote/status_pusher.go
  • internal/runtime/runner_helper_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/client_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/coordinator/log_handler.go
  • internal/service/coordinator/service.go
  • internal/service/coordinator/static_registry.go
  • internal/service/coordinator/static_registry_test.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/worker/handler.go
  • internal/service/worker/handler_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/worker.go
  • internal/test/coordinator.go
  • internal/test/helper.go
  • proto/coordinator/v1/coordinator.proto
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/common/fileutil/fileutil.go
  • internal/service/worker/handler.go
  • internal/common/config/config.go
  • internal/service/coordinator/handler_test.go
  • internal/runtime/runner_helper_test.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/cmd/context.go
  • internal/service/coordinator/service.go
  • internal/service/scheduler/dag_executor.go
  • internal/runtime/context.go
  • internal/test/coordinator.go
  • internal/common/fileutil/fileutil_test.go
  • internal/runtime/remote/status_pusher.go
  • internal/runtime/executor/task.go
  • internal/runtime/manager.go
  • internal/common/config/definition.go
  • internal/service/worker/remote_handler.go
  • internal/cmd/flags.go
  • internal/test/helper.go
  • internal/integration/remote_test.go
  • internal/service/worker/worker.go
  • internal/cmd/startall.go
  • internal/service/worker/poller_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/proto/convert/status.go
  • internal/runtime/executor/dag_runner.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/static_registry_test.go
  • internal/common/config/loader.go
  • internal/runtime/output.go
  • internal/service/worker/handler_test.go
  • internal/core/execution/context.go
  • internal/cmd/worker.go
  • internal/runtime/agent/agent.go
  • internal/integration/distributed_helpers_test.go
  • internal/service/coordinator/client_test.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/service/coordinator/static_registry.go
  • internal/proto/convert/status_test.go
  • internal/service/coordinator/handler.go
  • internal/runtime/remote/log_streamer.go
  • internal/service/coordinator/log_handler.go
  • internal/cmd/coordinator.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/service/coordinator/handler_test.go
  • internal/runtime/runner_helper_test.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/common/fileutil/fileutil_test.go
  • internal/integration/remote_test.go
  • internal/service/worker/poller_test.go
  • internal/service/coordinator/static_registry_test.go
  • internal/service/worker/handler_test.go
  • internal/integration/distributed_helpers_test.go
  • internal/service/coordinator/client_test.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/proto/convert/status_test.go
🧠 Learnings (5)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks

Applied to files:

  • internal/runtime/runner_helper_test.go
  • internal/common/fileutil/fileutil_test.go
  • internal/service/coordinator/client_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/cmd/context.go
  • internal/service/worker/poller_test.go
  • internal/runtime/executor/dag_runner.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths

Applied to files:

  • internal/common/fileutil/fileutil_test.go
  • internal/test/helper.go
  • internal/integration/remote_test.go
  • internal/service/coordinator/static_registry_test.go
  • internal/service/coordinator/client_test.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/proto/convert/status_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages

Applied to files:

  • internal/common/fileutil/fileutil_test.go
  • internal/integration/remote_test.go
  • internal/proto/convert/status_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`

Applied to files:

  • internal/runtime/executor/dag_runner.go
🧬 Code graph analysis (30)
internal/service/worker/handler.go (1)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-241)
internal/service/coordinator/handler_test.go (5)
internal/core/execution/dagrun.go (2)
  • DAGRunRef (128-131)
  • ErrDAGRunIDNotFound (14-14)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/service/coordinator/handler.go (2)
  • NewHandler (75-85)
  • WithDAGRunStore (62-66)
internal/core/status.go (6)
  • Running (8-8)
  • NodeRunning (66-66)
  • NodeSucceeded (69-69)
  • Failed (9-9)
  • NodeFailed (67-67)
  • Succeeded (11-11)
internal/service/scheduler/dag_executor.go (2)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/runtime/executor/task.go (1)
  • WithPreviousStatus (83-89)
internal/runtime/context.go (1)
internal/core/execution/context.go (2)
  • WithLogWriterFactory (229-233)
  • LogWriterFactory (35-39)
internal/test/coordinator.go (3)
internal/test/helper.go (5)
  • HelperOption (42-42)
  • Helper (360-374)
  • Options (44-54)
  • WithStatusPersistence (84-88)
  • WithLogPersistence (92-96)
internal/service/coordinator/handler.go (5)
  • HandlerOption (59-59)
  • WithDAGRunStore (62-66)
  • WithLogDir (69-73)
  • NewHandler (75-85)
  • Handler (39-56)
internal/common/config/path.go (1)
  • Paths (13-30)
internal/common/fileutil/fileutil_test.go (1)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-241)
internal/runtime/remote/status_pusher.go (5)
internal/service/coordinator/client.go (1)
  • Client (29-55)
internal/runtime/agent/agent.go (1)
  • StatusPusher (153-155)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
proto/coordinator/v1/coordinator.pb.go (3)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
internal/proto/convert/status.go (1)
  • DAGRunStatusToProto (12-51)
internal/runtime/executor/task.go (3)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
proto/coordinator/v1/coordinator.pb.go (3)
  • Task (365-384)
  • Task (397-397)
  • Task (412-414)
internal/proto/convert/status.go (1)
  • DAGRunStatusToProto (12-51)
internal/runtime/manager.go (5)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
internal/core/execution/dagrun.go (1)
  • NewDAGRunRef (135-140)
internal/core/dag.go (1)
  • SockAddr (611-637)
internal/common/fileutil/fileutil.go (1)
  • FileExists (44-47)
internal/test/server.go (1)
  • Request (113-120)
internal/test/helper.go (2)
internal/runtime/agent/agent.go (1)
  • Options (158-189)
internal/persistence/filedag/store.go (1)
  • Options (31-36)
internal/integration/remote_test.go (4)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (3)
  • WithStatusPersistence (84-88)
  • DAG (446-449)
  • WithLogPersistence (92-96)
internal/core/execution/queue.go (1)
  • QueueStore (16-35)
internal/core/status.go (5)
  • Succeeded (11-11)
  • NodeSucceeded (69-69)
  • Running (8-8)
  • Aborted (10-10)
  • Failed (9-9)
internal/service/worker/worker.go (4)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/cmd/context.go (1)
  • Context (44-59)
internal/common/logger/context.go (1)
  • Info (40-42)
internal/common/logger/tag/tag.go (2)
  • RunID (40-42)
  • WorkerID (65-67)
internal/cmd/startall.go (1)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/service/worker/poller_test.go (2)
proto/coordinator/v1/coordinator.pb.go (15)
  • HeartbeatRequest (700-707)
  • HeartbeatRequest (720-720)
  • HeartbeatRequest (735-737)
  • HeartbeatResponse (761-768)
  • HeartbeatResponse (781-781)
  • HeartbeatResponse (796-798)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
  • ReportStatusResponse (1015-1021)
  • ReportStatusResponse (1034-1034)
  • ReportStatusResponse (1049-1051)
  • GetDAGRunStatusResponse (1899-1906)
  • GetDAGRunStatusResponse (1919-1919)
  • GetDAGRunStatusResponse (1934-1936)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/service/scheduler/queue_processor.go (1)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/proto/convert/status.go (5)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/common/cmdutil/eval_test.go (1)
  • Root (124-127)
internal/core/params.go (1)
  • Params (36-45)
internal/core/step.go (1)
  • ExecutorConfig (195-203)
internal/common/collections/syncmap.go (1)
  • SyncMap (10-14)
internal/runtime/executor/dag_runner.go (5)
internal/core/dag.go (1)
  • DAG (63-178)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-241)
internal/core/execution/context.go (2)
  • Context (17-29)
  • RunStatus (126-137)
internal/proto/convert/status.go (1)
  • ProtoToDAGRunStatus (114-153)
internal/core/execution/node.go (1)
  • Node (9-36)
internal/service/coordinator/client.go (1)
proto/coordinator/v1/coordinator.pb.go (18)
  • HeartbeatRequest (700-707)
  • HeartbeatRequest (720-720)
  • HeartbeatRequest (735-737)
  • HeartbeatResponse (761-768)
  • HeartbeatResponse (781-781)
  • HeartbeatResponse (796-798)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
  • ReportStatusResponse (1015-1021)
  • ReportStatusResponse (1034-1034)
  • ReportStatusResponse (1049-1051)
  • GetDAGRunStatusResponse (1899-1906)
  • GetDAGRunStatusResponse (1919-1919)
  • GetDAGRunStatusResponse (1934-1936)
  • GetDAGRunStatusRequest (1828-1838)
  • GetDAGRunStatusRequest (1851-1851)
  • GetDAGRunStatusRequest (1866-1868)
internal/service/coordinator/static_registry_test.go (1)
internal/service/coordinator/static_registry.go (1)
  • NewStaticRegistry (22-49)
internal/common/config/loader.go (1)
internal/common/config/config.go (1)
  • Worker (279-284)
internal/runtime/output.go (2)
internal/core/execution/context.go (4)
  • Context (17-29)
  • LogWriterFactory (35-39)
  • StreamTypeStdout (44-44)
  • StreamTypeStderr (46-46)
internal/runtime/context.go (3)
  • Context (14-14)
  • GetDAGContext (50-52)
  • LogWriterFactory (46-46)
internal/service/worker/handler_test.go (1)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-241)
internal/cmd/worker.go (3)
internal/service/worker/worker.go (2)
  • Worker (21-34)
  • NewWorker (42-61)
internal/service/coordinator/static_registry.go (1)
  • NewStaticRegistry (22-49)
internal/service/worker/remote_handler.go (2)
  • RemoteTaskHandlerConfig (28-45)
  • NewRemoteTaskHandler (49-60)
internal/runtime/agent/agent.go (5)
internal/runtime/remote/status_pusher.go (1)
  • StatusPusher (14-17)
internal/core/execution/context.go (4)
  • LogWriterFactory (35-39)
  • Context (17-29)
  • WithLogWriterFactory (229-233)
  • NewContext (238-277)
internal/runtime/context.go (4)
  • LogWriterFactory (46-46)
  • Context (14-14)
  • WithLogWriterFactory (42-42)
  • NewContext (28-28)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/runtime/executor/dag_runner_test.go (1)
internal/runtime/data.go (1)
  • Parallel (85-89)
internal/proto/convert/status_test.go (6)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/status.go (4)
  • Running (8-8)
  • NodeSucceeded (69-69)
  • NodeRunning (66-66)
  • NodeNotStarted (65-65)
internal/core/params.go (1)
  • Params (36-45)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/core/step.go (1)
  • ExecutorConfig (195-203)
internal/common/collections/syncmap.go (1)
  • SyncMap (10-14)
internal/service/coordinator/handler.go (4)
internal/core/execution/dagrun.go (2)
  • DAGRunStore (23-48)
  • DAGRunRef (128-131)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/proto/convert/status.go (1)
  • ProtoToDAGRunStatus (114-153)
internal/core/status.go (3)
  • Failed (9-9)
  • NodeRunning (66-66)
  • NodeFailed (67-67)
internal/runtime/remote/log_streamer.go (4)
internal/service/coordinator/client.go (1)
  • Client (29-55)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/core/execution/context.go (3)
  • Context (17-29)
  • StreamTypeStdout (44-44)
  • StreamTypeStderr (46-46)
proto/coordinator/v1/coordinator.pb.go (10)
  • LogChunk (1640-1657)
  • LogChunk (1670-1670)
  • LogChunk (1685-1687)
  • LogStreamType (127-127)
  • LogStreamType (159-161)
  • LogStreamType (163-165)
  • LogStreamType (172-174)
  • LogStreamType_LOG_STREAM_TYPE_STDOUT (131-131)
  • LogStreamType_LOG_STREAM_TYPE_STDERR (132-132)
  • LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED (130-130)
internal/service/coordinator/log_handler.go (3)
internal/common/logger/context.go (1)
  • Warn (45-47)
proto/coordinator/v1/coordinator.pb.go (10)
  • StreamLogsResponse (1767-1774)
  • StreamLogsResponse (1787-1787)
  • StreamLogsResponse (1802-1804)
  • LogChunk (1640-1657)
  • LogChunk (1670-1670)
  • LogChunk (1685-1687)
  • LogStreamType (127-127)
  • LogStreamType (159-161)
  • LogStreamType (163-165)
  • LogStreamType (172-174)
internal/common/fileutil/fileutil.go (1)
  • OpenOrCreateFile (62-70)
internal/cmd/coordinator.go (4)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/cmd/context.go (1)
  • Context (44-59)
internal/service/coordinator/service.go (2)
  • Service (20-29)
  • NewService (31-50)
internal/service/coordinator/handler.go (4)
  • Handler (39-56)
  • NewHandler (75-85)
  • WithDAGRunStore (62-66)
  • WithLogDir (69-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest

Comment thread internal/common/fileutil/fileutil_test.go
Comment thread internal/runtime/executor/dag_runner.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go Outdated
Comment thread internal/service/coordinator/service.go
Comment thread internal/service/coordinator/service.go
Comment thread proto/coordinator/v1/coordinator.proto
@yohamta0

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jan 10, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
internal/integration/distributed_parallel_test.go (1)

327-341: Fix ignored FindSubAttempt error (current assertion checks the wrong err).

You discard the FindSubAttempt error (att, _ := ...) and then require.NoError(t, err) uses an outer err, which can hide failures.

Proposed fix
-        for _, child := range parallelNode.SubRuns {
-            att, _ := coord.DAGRunStore.FindSubAttempt(coord.Context, runRef, child.DAGRunID)
+        for _, child := range parallelNode.SubRuns {
+            att, findErr := coord.DAGRunStore.FindSubAttempt(coord.Context, runRef, child.DAGRunID)
+            require.NoError(t, findErr)
             if att == nil {
                 continue
             }
-            require.NoError(t, err)
-
-            status, err := att.ReadStatus(coord.Context)
-            require.NoError(t, err)
+            status, readErr := att.ReadStatus(coord.Context)
+            require.NoError(t, readErr)
             require.Equal(t, core.Aborted, status.Status)
             canceled = true
         }
internal/runtime/agent/agent.go (1)

1365-1371: Potential info leak: returning raw err.Error() over HTTP.

Even though this is over a unix socket, it may expose more internal detail than intended; consider mapping to a generic message for non-typed errors (or ensuring errors never include sensitive data).

internal/service/coordinator/handler_test.go (1)

165-215: Tests are likely flaky due to fixed time.Sleep synchronization.

The 100ms sleeps (“give the poller time to register”) will intermittently fail under load/CI variance, especially with t.Parallel(). Prefer require.Eventually and (for the poll test) a buffered channel to avoid goroutine blocking while the test sleeps.

Example rewrite (PollAndDispatch)
-		// Start polling in a goroutine
-		pollDone := make(chan *coordinatorv1.PollResponse)
-		pollErr := make(chan error)
+		// Start polling in a goroutine
+		pollDone := make(chan *coordinatorv1.PollResponse, 1)
+		pollErr := make(chan error, 1)
 		go func() {
@@
-		// Give the poller time to register
-		time.Sleep(100 * time.Millisecond)
+		// Wait until poller is registered (avoid timing sleeps)
+		require.Eventually(t, func() bool {
+			h.mu.Lock()
+			defer h.mu.Unlock()
+			_, ok := h.waitingPollers["poller1"]
+			return ok
+		}, 1*time.Second, 10*time.Millisecond)

Based on coding guidelines, this also keeps tests more deterministic.

Also applies to: 240-270

🤖 Fix all issues with AI agents
In @internal/runtime/remote/log_streamer.go:
- Around line 84-145: flush() currently sends the entire w.buffer as one
LogChunk which can exceed gRPC message size; change flush() to send the buffer
in bounded pieces: define a maxChunkSize (e.g. 4*1024*1024), ensure the stream
is initialized as now, then loop over w.buffer creating a copy/slice of at most
maxChunkSize for each iteration, increment w.sequence for each sent chunk, call
w.stream.Send for each chunk, and only remove the bytes successfully sent from
w.buffer (i.e., on successful Send advance the buffer start or reslice),
returning the first Send error without discarding unsent data; keep the existing
behavior of creating dataCopy per chunk and the existing metadata fields
(WorkerId, DagRunId, StepName, StreamType, RootDagRunName/Id, AttemptId).

In @internal/service/coordinator/handler_test.go:
- Around line 48-70: Several mockDAGRunStore methods (CreateAttempt,
RecentAttempts, LatestAttempt, ListStatuses, FindSubAttempt, RemoveOldDAGRuns,
RenameDAGRuns, RemoveDAGRun) currently return (nil, nil) which can mask
test-time surprises; update each unimplemented mock method to return a clear
sentinel error (e.g., errors.New("mockDAGRunStore: unimplemented <MethodName>"))
or call panic("mockDAGRunStore: unimplemented <MethodName>") instead of (nil,
nil) so any accidental invocation fails loudly and points to the exact method.

In @internal/service/coordinator/handler.go:
- Around line 407-418: Update the error message in Handler.StreamLogs to avoid
saying "nil" for a string: keep the existing check on h.logDir but change the
status.Error call to use a message like "log streaming not configured: logDir is
empty" (reference: Handler.StreamLogs, h.logDir,
status.Error/codes.FailedPrecondition).
- Around line 87-97: Cached DAGRunAttempt instances in Handler.openAttempts can
be used concurrently by multiple ReportStatus calls, causing races when calling
attempt.Write; serialize access per dagRunID by introducing a per-run mutex or
lock map. Update Handler to maintain a map[string]*sync.Mutex (or sync.RWMutex)
keyed by dagRunID and acquire that mutex around any use of
openAttempts[dagRunID] (notably in ReportStatus paths and in Close), ensuring
attempt.Write/Close calls are executed while holding the per-run lock; release
and clean up the per-run mutex when deleting the attempt. Apply this change to
the code paths referenced (Handler.Close and the ReportStatus handling around
lines ~287-333 and ~335-405).
- Around line 466-494: StartZombieDetector currently overwrites
h.zombieDetectorDone and can start multiple goroutines; add a guard to prevent
starting twice by checking h.zombieDetectorDone before creating a new channel
and returning early if the detector is already running, and protect that
check+assign with a mutex to avoid races (e.g., introduce or reuse a lock field
and wrap the check/assignment around h.zombieDetectorDone), leaving
WaitZombieDetector unchanged so it will wait on the single channel.
- Around line 287-333: The handler currently returns a successful RPC with
Accepted:false for operational errors which clients may ignore; change
ReportStatus so operational failures return gRPC errors instead of a 200 OK
payload. Specifically, replace the branches that return
&coordinatorv1.ReportStatusResponse{Accepted:false, Error:...} when
getOrOpenAttempt/getOrOpenSubAttempt (the variables/funcs: getOrOpenAttempt,
getOrOpenSubAttempt) or attempt.Write fail, and return status.Error with an
appropriate code (e.g., codes.Internal or codes.Unavailable) and the error
message; keep returning a normal response only on success (Accepted:true).
Ensure you still validate req.Status and the dagRunStore precondition as before.
- Around line 496-593: The cleanup I/O in markRunFailed currently uses the
detector's cancellable ctx, so if that ctx is canceled in-flight the
ReadStatus/Open/Write calls can fail; modify markRunFailed (and calls from
markWorkerTasksFailed/detectAndCleanupZombies as needed) to create a
non-cancelable context for the store operations (e.g., use
context.WithoutCancel(ctx) on Go 1.21+ or context.Background() as a fallback)
and pass that new ctx into attempt.ReadStatus/Open/Write so zombie-marking
completes even if the caller ctx is canceled.
- Around line 539-590: markRunFailed currently calls FindAttempt directly which
can open a separate writer and race with ReportStatus; replace the
FindAttempt/Open sequence with the shared getOrOpenAttempt(ctx, dagName,
dagRunID) to reuse the per-run writer from openAttempts. Specifically, in
markRunFailed (and the similar block around lines 335-369) call getOrOpenAttempt
to obtain the attempt, remove the explicit attempt.Open call and rely on the
returned attempt being the shared, opened instance, keep the existing defer
attempt.Close(ctx) semantics as appropriate, and ensure all error logging
remains unchanged.
🧹 Nitpick comments (5)
internal/runtime/executor/dag_runner.go (1)

418-448: Consider combining completion check and status retrieval into a single RPC.

In the polling loop, isSubDAGRunCompleted (line 420) and getSubDAGRunStatus (line 434) both make GetDAGRunStatus RPC calls to the coordinator. This results in two RPCs per poll iteration when completion is detected.

You could optimize by having isSubDAGRunCompleted return the status when completed, or directly use getSubDAGRunStatus and check if the status is terminal.

♻️ Potential optimization
 		case <-ticker.C:
-			// Check if the sub DAG run has completed
-			isCompleted, err := e.isSubDAGRunCompleted(ctx, dagRunID)
+			// Get sub DAG run status
+			result, err := e.getSubDAGRunStatus(ctx, dagRunID)
 			if err != nil {
-				logger.Warn(waitCtx, "Failed to check sub DAG run completion",
+				logger.Debug(waitCtx, "Sub DAG run status not available yet",
 					tag.Error(err),
 				)
-				continue // Retry on error
+				continue
 			}
 
-			if !isCompleted {
+			if result.Status.IsActive() {
 				logger.Debug(waitCtx, "Sub DAG run not completed yet")
-				continue // Not completed, keep polling
+				continue
 			}
 
-			// Check the final status of the sub DAG run
-			result, err := e.getSubDAGRunStatus(ctx, dagRunID)
-			if err != nil {
-				// Not found yet, continue polling
-				logger.Debug(waitCtx, "Sub DAG run status not available yet",
-					tag.Error(err),
-				)
-				continue
-			}
-
-			// If we got a result, the sub DAG has completed
 			logger.Info(waitCtx, "Distributed execution completed",
 				tag.Name(result.Name),
 			)
 
 			return result, nil
internal/cmd/coordinator.go (1)

118-195: Consider guarding WithDAGRunStore when store can be nil.

coordinator.NewHandler(coordinator.WithDAGRunStore(dagRunStore), ...) is fine only if the handler treats nil as “persistence disabled” everywhere; otherwise it risks nil dereferences when shared-nothing features are exercised.

Example guard
-    handler := coordinator.NewHandler(
-        coordinator.WithDAGRunStore(dagRunStore),
-        coordinator.WithLogDir(cfg.Paths.LogDir),
-    )
+    handlerOpts := []coordinator.HandlerOption{
+        coordinator.WithLogDir(cfg.Paths.LogDir),
+    }
+    if dagRunStore != nil {
+        handlerOpts = append(handlerOpts, coordinator.WithDAGRunStore(dagRunStore))
+    }
+    handler := coordinator.NewHandler(handlerOpts...)
internal/service/coordinator/client.go (1)

198-250: Resiliency: consider evicting cached clients on failed health checks.

attemptCall removes the client on connection creation failure, but not on repeated isHealthy failures; you may want to removeClient(member.ID) (or on codes.Unavailable) to allow a clean reconnect.

Also applies to: 252-274

internal/service/worker/worker.go (1)

156-188: Cancellation handling is correct; consider not holding pollersMu while calling cancel funcs.

Calling cancelFunc() under lock is usually fine, but you can reduce contention by collecting funcs under lock and invoking them after unlocking (especially if you later add more work in cancellation).

Also applies to: 286-299

internal/runtime/remote/log_streamer.go (1)

129-145: Consider sequence increment semantics on send failure.

Right now sequence increments before Send; if Send fails, you’ll have gaps. If the coordinator expects strict contiguous sequences, increment only after successful Send.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a31b5b5 and 04b73fb.

📒 Files selected for processing (18)
  • internal/cmd/coordinator.go
  • internal/common/fileutil/fileutil.go
  • internal/common/fileutil/fileutil_test.go
  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/integration/distributed_subdag_test.go
  • internal/integration/distributed_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/runner.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/coordinator/service.go
  • internal/service/worker/worker.go
  • internal/test/coordinator.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/test/coordinator.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_subdag_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/service.go
  • internal/integration/distributed_parallel_test.go
  • internal/integration/distributed_test.go
  • internal/runtime/agent/agent.go
  • internal/common/fileutil/fileutil_test.go
  • internal/common/fileutil/fileutil.go
  • internal/cmd/coordinator.go
  • internal/service/worker/worker.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/remote/log_streamer.go
  • internal/service/coordinator/handler_test.go
  • internal/runtime/runner.go
  • internal/service/coordinator/handler.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_subdag_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/integration/distributed_test.go
  • internal/common/fileutil/fileutil_test.go
  • internal/service/coordinator/handler_test.go
🧠 Learnings (5)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths

Applied to files:

  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_subdag_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/integration/distributed_test.go
  • internal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages

Applied to files:

  • internal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks

Applied to files:

  • internal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`

Applied to files:

  • internal/runtime/executor/dag_runner.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/runtime/executor/dag_runner.go
  • internal/service/coordinator/handler.go
🧬 Code graph analysis (11)
internal/integration/distributed_coordinator_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/integration/distributed_subdag_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/integration/distributed_e2e_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/service/coordinator/client.go (2)
proto/coordinator/v1/coordinator.pb.go (18)
  • HeartbeatRequest (700-707)
  • HeartbeatRequest (720-720)
  • HeartbeatRequest (735-737)
  • HeartbeatResponse (761-768)
  • HeartbeatResponse (781-781)
  • HeartbeatResponse (796-798)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
  • ReportStatusResponse (1015-1021)
  • ReportStatusResponse (1034-1034)
  • ReportStatusResponse (1049-1051)
  • GetDAGRunStatusResponse (1899-1906)
  • GetDAGRunStatusResponse (1919-1919)
  • GetDAGRunStatusResponse (1934-1936)
  • GetDAGRunStatusRequest (1828-1838)
  • GetDAGRunStatusRequest (1851-1851)
  • GetDAGRunStatusRequest (1866-1868)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/service/coordinator/service.go (1)
internal/service/scheduler/scheduler.go (2)
  • Scheduler (39-62)
  • Scheduler (309-311)
internal/integration/distributed_parallel_test.go (3)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (2)
  • WithStatusPersistence (84-88)
  • WithDAGsDir (63-67)
internal/core/spec/loader.go (1)
  • WithDAGsDir (90-94)
internal/integration/distributed_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/common/fileutil/fileutil_test.go (2)
internal/service/worker/handler_test.go (1)
  • TestCreateTempDAGFile (171-185)
internal/common/fileutil/fileutil.go (2)
  • CreateTempDAGFile (200-250)
  • FileExists (44-47)
internal/cmd/coordinator.go (4)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/cmd/context.go (1)
  • Context (44-59)
internal/service/coordinator/service.go (2)
  • Service (21-34)
  • NewService (36-57)
internal/service/coordinator/handler.go (3)
  • Handler (39-56)
  • WithDAGRunStore (62-66)
  • WithLogDir (69-73)
internal/service/worker/worker.go (6)
internal/runtime/subcmd.go (1)
  • Run (258-280)
internal/common/config/config.go (1)
  • Worker (279-284)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/cmd/context.go (1)
  • Context (44-59)
internal/common/logger/context.go (3)
  • Info (40-42)
  • Warn (45-47)
  • Errorf (75-77)
internal/common/logger/tag/tag.go (2)
  • WorkerID (65-67)
  • RunID (40-42)
internal/runtime/executor/dag_runner.go (4)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-250)
internal/core/execution/context.go (3)
  • Context (17-29)
  • RunStatus (126-137)
  • GetContext (286-298)
internal/proto/convert/status.go (1)
  • ProtoToDAGRunStatus (114-153)
internal/core/execution/node.go (1)
  • Node (9-36)
🔇 Additional comments (37)
internal/common/fileutil/fileutil_test.go (5)

13-16: LGTM! - Parallel test execution added correctly.

Adding t.Parallel() to these existing test functions and subtests is appropriate since they use t.TempDir() for isolation and don't share mutable state.

Also applies to: 36-37


229-252: LGTM! - BasicFile test case is well-structured.

The test properly verifies file creation, content integrity, and path structure. Using t.Cleanup for deferred removal is idiomatic.


254-282: LGTM! - WithExtraDocs test validates YAML multi-document handling.

Good test coverage verifying that YAML document separators conform to the spec by appearing on their own line.


284-302: LGTM! - Edge case for missing trailing newline is covered.

This test ensures the implementation correctly handles documents without trailing newlines, which is important for YAML spec compliance.


304-319: LGTM! - Empty extraDocs case correctly tested.

Ensures that when no extra documents are provided, the output matches the primary document exactly without any separators.

internal/common/fileutil/fileutil.go (3)

196-212: LGTM! - Clean temp file creation with proper error handling.

The implementation correctly creates the temp directory structure and temp file with a meaningful naming pattern. Directory permission 0750 is appropriate for a temp directory.


214-242: LGTM! - IIFE pattern ensures proper resource cleanup.

The immediately-invoked function ensures tempFile.Close() is always called before checking the write error, which is a clean pattern for this use case. The YAML separator logic correctly handles documents with or without trailing newlines.


244-250: LGTM! - Error handling with proper cleanup.

The implementation correctly removes the temp file on write failure and returns a well-wrapped error.

internal/runtime/executor/dag_runner.go (5)

67-76: LGTM! - Local DAG collection for temp file creation.

The logic correctly collects YAML data from sibling local DAGs (excluding the target child) to include in the temporary file, enabling local DAG references within sub-DAGs.


500-535: LGTM! - Output extraction handles both local and proto formats.

The function correctly normalizes output variables from two storage formats:

  1. Local format: KEY=value (extracts value after =)
  2. Proto format: just value

The type assertions with early returns are safe and idiomatic.


537-564: LGTM! - Completion check helper with appropriate fallback logic.

The helper correctly distinguishes between "not found" (returns false, not completed) and actual errors. The fallback from coordinator to local DB provides resilience.


458-498: [No changes needed—the implementation is correct.]

The RunStatus struct is intentionally designed with only five fields (Name, DAGRunID, Params, Outputs, Status). It does not have fields for StartedAt, FinishedAt, or Nodes. All fields being copied from dagRunStatus to the returned RunStatus are appropriate, and both callers only access Status and Name, which are already included.

Likely an incorrect or invalid review comment.


382-388: Correct use of context.WithoutCancel for cancellation polling.

Using context.WithoutCancel ensures status queries can complete even when the parent context is canceled, which is necessary to confirm the sub-DAG has actually terminated. Introduced in Go 1.21, this is the idiomatic approach for this pattern.

internal/runtime/runner.go (2)

229-230: LGTM! Panic recovery now signals progress.

The defer statement correctly passes progressCh to enable status updates after panic recovery, ensuring the coordinator can persist the failed state. This aligns with the broader status/log streaming architecture introduced in this PR.


1065-1089: LGTM! Panic recovery properly signals progress.

The function correctly:

  • Accepts optional progressCh parameter with nil-check (lines 1085-1087)
  • Updates metrics before signaling progress
  • Signals progress so the coordinator can persist the failed state

The implementation is backward-compatible and thread-safe.

internal/service/coordinator/service.go (3)

27-27: LGTM! New fields support graceful shutdown.

The cfg field enables configuration-driven behavior (e.g., zombie detection interval), and stopCancel enables proper cleanup of internal goroutines during shutdown.

Also applies to: 32-33


67-79: LGTM! Zombie detector lifecycle properly initialized.

The implementation correctly:

  • Creates an internal cancellable context for lifecycle management
  • Uses configured interval with a sensible default (45s)
  • Includes nil-check for config (line 74)
  • Logs startup for observability

137-146: LGTM! Graceful shutdown sequence is correct.

The shutdown sequence properly:

  1. Cancels the internal context to signal the zombie detector to stop
  2. Waits for the zombie detector to finish (prevents resource leaks)
  3. Closes handler resources after the detector has stopped

The nil-check for stopCancel (line 138) prevents panics if Start was never called.

internal/integration/distributed_subdag_test.go (1)

31-31: LGTM! Test setup updated for status persistence.

All test cases now correctly enable status persistence via test.WithStatusPersistence(), which is essential for testing the distributed coordinator's ability to track and persist DAG run states across the new shared-nothing worker architecture.

Also applies to: 70-70, 106-106, 140-140

internal/integration/distributed_coordinator_test.go (1)

18-18: LGTM! Test setup updated for status persistence.

Both test cases (TestCoordinatorGetWorkers and TestCoordinatorHeartbeat) now enable status persistence, which is necessary for testing worker registration and heartbeat functionality in the coordinator.

Also applies to: 107-107

internal/integration/distributed_test.go (1)

32-32: LGTM! Test setup updated for status persistence.

All test cases now enable status persistence, which is essential for verifying that DAGs with worker selectors are correctly tracked and persisted in the distributed execution model.

Also applies to: 83-83, 122-122

internal/integration/distributed_e2e_test.go (3)

37-40: Good: enable coordinator status persistence for the E2E flow.

Switching to test.SetupCoordinator(t, test.WithStatusPersistence()) matches the new distributed/persistent coordinator behavior and should make status assertions more realistic.


141-153: Good: local dagu start test still uses persistent coordinator setup.

This keeps the test environment consistent with the distributed mode while validating “start executes locally”.


194-197: Good: cancellation E2E uses persistent coordinator setup.

Makes the abort/cancel assertions less dependent on in-memory state.

internal/integration/distributed_parallel_test.go (1)

40-42: Good: distributed-parallel tests now run with status persistence enabled.

Consistently using test.WithStatusPersistence() (and test.WithDAGsDir(...) where needed) aligns these tests with the persistent coordinator path.

Also applies to: 118-120, 178-179, 217-218, 260-262, 384-386, 467-469

internal/service/coordinator/client.go (3)

29-55: Nice API expansion for shared-nothing: Heartbeat response + status/log streaming hooks.

The interface updates cleanly express cancellation directives (HeartbeatResponse) and add the needed RPC surfaces for shared-nothing workers.


105-188: Good: centralized coordinator discovery with explicit “no coordinators” error.

Using getCoordinatorMembers in Dispatch/Poll improves error clarity and avoids silent no-op behavior.

Also applies to: 385-396


499-539: Verify gRPC client creation API (grpc.NewClient) and dial options expectations.

If the repo standard is grpc.DialContext (and/or expects WithBlock, keepalive, service config, etc.), confirm grpc.NewClient(address, ...) is correct for grpc-go v1.75.0 and your connection lifecycle expectations.

Also applies to: 571-621

internal/service/worker/worker.go (3)

118-147: Good: Stop cancels internal ctx, waits for goroutines, then cleans up gRPC clients.

The stopOnce + stopDone pattern is a solid step toward graceful shutdown.


259-262: Verify Go version compatibility for min(...) / math.MaxInt32 usage.

If the module Go version is < 1.21, min won’t compile.


68-116: No action needed—the blocking behavior is the intended design and is already properly used throughout the codebase.

Tests explicitly verify that Start() blocks until shutdown (via context cancellation or Stop()), and all existing call sites correctly wrap it in goroutines. The shutdown coordination between Start() and Stop() is properly implemented and tested. There is no risk of deadlock in current code.

internal/runtime/agent/agent.go (5)

138-189: Good: clean option surface for shared-nothing status/log plumbing.

The Options additions and the corresponding Agent wiring are straightforward and keep the default (local) path intact.

Also applies to: 205-224


339-345: Good: propagate attempt ID + log-writer factory into runtime context.

Setting attempt ID (when supported) and attaching runtime.WithLogWriterFactory(...) should make remote step log streaming work without invasive changes elsewhere.

Also applies to: 361-373


876-897: Verify Go version + shared-nothing status semantics (context.WithoutCancel + push-only).

context.WithoutCancel requires Go >= 1.21, and “push-only” means local status files may no longer be updated—ensure all consumers in shared-nothing mode read from coordinator (or otherwise don’t rely on local status persistence).


904-915: Good: avoid sending SIGTERM on ctx cancellation after normal completion.

The !a.finished.Load() guard prevents unnecessary termination signaling on normal shutdown paths.


1279-1294: The code is correct—Retry: true is appropriate for both cases.

The Retry flag in Store.CreateAttempt() only determines whether to find an existing DAGRun (when true) or create a new one (when false). It is not passed to run.CreateAttempt(), so it does not affect attempt directory naming, timestamps, or uniqueness. Attempts always receive unique IDs via genAttemptID() regardless of whether Retry is true or false.

Both use cases correctly set Retry: true:

  • Retrying a failed execution: The run already exists from the previous attempt
  • Running from queue: The run was pre-created by the enqueue operation

There is no risk of overwriting or selecting the wrong attempt directory. The suggested refactoring to a dedicated UseExistingRun option would be a semantic improvement for clarity, but the current implementation is functionally correct.

internal/cmd/coordinator.go (1)

81-106: Remove redundant cleanup from defer; Service.Stop() already handles zombie detector and handler shutdown.

The svc.Stop(ctx) call already cancels the internal context (signaling zombie detector to exit), waits for it to complete via WaitZombieDetector(), and closes handler resources. The defer block duplicates this cleanup unnecessarily. Since the zombie detector goroutine only watches the internal context cancellation (not triggered by Close()), there is no shutdown hang risk—the current order is safe but wasteful.

Likely an incorrect or invalid review comment.

Comment thread internal/runtime/remote/log_streamer.go
Comment thread internal/service/coordinator/handler_test.go
Comment thread internal/service/coordinator/handler_test.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
@yohamta0 yohamta0 force-pushed the experimental-worker-sync branch from 515a6cc to 053e670 Compare January 11, 2026 03:56
@yohamta0

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jan 11, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (6)
internal/service/scheduler/queue_processor.go (1)

453-497: monitorStartup treats core.Running as “not started” (should count as started).

Right now, if heartbeat isn’t observed but status is Running, monitorStartup still returns errNotStarted (Line 489-497). That makes startup confirmation depend on heartbeat (or a terminal status), which can unnecessarily delay dequeueing and amplifies duplicate-dispatch risk when heartbeats are delayed or flaky.

Proposed fix
-	if status.Status != core.Queued && status.Status != core.Running {
-		logger.Info(ctx, "DAG execution started or finished",
-			tag.Status(status.Status.String()),
-		)
-		return true, nil
-	}
-
-	return false, errNotStarted
+	switch status.Status {
+	case core.Queued:
+		return false, errNotStarted
+	case core.Running:
+		logger.Info(ctx, "DAG run has started (status is running)",
+			tag.Status(status.Status.String()),
+		)
+		return true, nil
+	default:
+		logger.Info(ctx, "DAG execution started or finished",
+			tag.Status(status.Status.String()),
+		)
+		return true, nil
+	}
internal/service/coordinator/service.go (1)

59-79: Stop() assumes zombie detector was started—ensure WaitZombieDetector() can’t block forever.
If Stop() can be called on a partially-initialized service, consider guarding with a “started” flag or making WaitZombieDetector() a no-op when not started.

Also applies to: 118-147

internal/runtime/runner.go (2)

223-235: Critical: defer order sends doneCh before panic recovery runs.
Because defers run LIFO, doneCh <- n happens before recoverNodePanic, so the runner loop can process a “completed” node that still looks NodeRunning and has no panic error recorded yet.

Proposed fix (defer doneCh first so it executes last)
 			go func(n *Node) {
@@
-				// Ensure node is finished and wg is decremented
-				defer r.finishNode(n, &wg)
-				// Recover from panics and signal progress for status updates
-				defer r.recoverNodePanic(ctx, n, progressCh)
-				// Signal completion to runner loop
-				defer func() {
-					doneCh <- n
-				}()
+				// Signal completion to runner loop (execute last)
+				defer func() { doneCh <- n }()
+				// Recover from panics and signal progress for status updates
+				defer r.recoverNodePanic(ctx, n, progressCh)
+				// Ensure node is finished and wg is decremented
+				defer r.finishNode(n, &wg)

1071-1095: Panic recovery should set a terminal node status (likely NodeFailed).
Right now recoverNodePanic records the error, but if node.MarkError doesn’t also set status, the node can remain NodeRunning and confuse final status computation.

Minimal fix
 func (r *Runner) recoverNodePanic(ctx context.Context, node *Node, progressCh chan *Node) {
 	if panicObj := recover(); panicObj != nil {
@@
 		node.MarkError(err)
+		node.SetStatus(core.NodeFailed)
 		r.setLastError(err)
internal/service/worker/worker.go (1)

74-116: Behavior change: Start() now blocks until shutdown—ensure callers expect this.
If previous behavior was “start in background and return”, this will deadlock call sites that don’t call Stop() or cancel ctx. Consider renaming to Run() or updating call sites/docs accordingly.

internal/cmd/coordinator.go (1)

81-105: Use a non-canceled context for graceful shutdown operations (best practice).

After <-ctx.Done() (line 98), svc.Stop(ctx) (line 101) and the deferred handler.Close(ctx) (line 90) receive a canceled context. While current implementations of Stop and Close don't explicitly check context cancellation and will still complete, this is semantically incorrect for shutdown operations. The test code (internal/test/coordinator.go) demonstrates the expected pattern: creating a separate timeout context with context.WithTimeout(context.Background(), 5*time.Second) for the Stop call.

Proposed fix (use non-canceled context for cleanup)
 func runCoordinator(ctx *Context, _ []string) error {
 	svc, handler, err := newCoordinator(ctx, ctx.Config, ctx.ServiceRegistry, ctx.DAGRunStore)
 	if err != nil {
 		return fmt.Errorf("failed to initialize coordinator: %w", err)
 	}

 	// Ensure handler resources are cleaned up on shutdown
 	defer func() {
 		handler.WaitZombieDetector()
-		handler.Close(ctx)
+		handler.Close(context.WithoutCancel(ctx))
 	}()

 	if err := svc.Start(ctx); err != nil {
 		return fmt.Errorf("failed to start coordinator: %w", err)
 	}

 	// Wait for context cancellation
 	<-ctx.Done()
 	logger.Info(ctx, "Coordinator shutting down")

-	if err := svc.Stop(ctx); err != nil {
+	if err := svc.Stop(context.WithoutCancel(ctx)); err != nil {
 		return fmt.Errorf("failed to stop coordinator: %w", err)
 	}

 	return nil
 }
🤖 Fix all issues with AI agents
In @internal/cmd/worker.go:
- Around line 77-104: The runWorker function uses the original workerID variable
for RemoteTaskHandlerConfig.WorkerID and tag.WorkerID even when empty; after
constructing w := worker.NewWorker(...), detect if workerID == "" and replace it
with the actual ID generated by the Worker (read from w via its accessor, e.g.
w.ID() or w.GetID()) before creating handlerCfg and logging so
RemoteTaskHandlerConfig.WorkerID and tag.WorkerID use the normalized, non-empty
worker ID.

In @internal/runtime/output.go:
- Around line 309-325: Wrap remote writers returned by factory.NewStepWriter
with masking.NewMaskingWriter when oc.masker is non-nil in setupRemoteWriters so
remote streams get the same secret-masking as local files; specifically, after
creating oc.stdoutWriter and (if not merged) oc.stderrWriter, check oc.masker
and replace each writer with masking.NewMaskingWriter(writer, oc.masker) before
assigning to oc.stdoutWriter/oc.stderrWriter (preserve the merged-stream logic
where stderrWriter = stdoutWriter when data.State.Stdout == data.State.Stderr)
and keep stdoutFileName/stderrFileName assignments and the existing return
behavior.

In @internal/service/coordinator/handler.go:
- Around line 457-481: StartZombieDetector currently calls
time.NewTicker(interval) which will panic if interval <= 0; add a guard at the
start of Handler.StartZombieDetector to validate the interval (e.g., if interval
<= 0 then return an error or set a sensible default) before creating the ticker
and before marking zombieDetectorStarted true. Ensure the check references
StartZombieDetector and time.NewTicker so you don't create the ticker or launch
the goroutine when interval is non-positive.
- Around line 292-391: getOrOpenSubAttempt currently uses subDAGRunID alone as
the cacheKey which can collide across different roots or with top-level IDs;
change getOrOpenSubAttempt to build a unique cache key (e.g. include the
rootRef.Name and rootRef.ID plus the subDAGRunID and a "sub:" prefix) and pass
that composed key into getOrOpenAttemptWithFinder, ensuring the finder still
calls dagRunStore.FindSubAttempt(rootRef, subDAGRunID); this prevents cross-root
collisions while preserving existing finder behavior.
- Around line 89-102: Handler.Close currently holds attemptsMu while calling
attempt.Close(ctx); change it to grab attemptsMu, copy or collect the entries
from openAttempts (e.g., into a slice or map of dagRunID->attempt), then release
attemptsMu, create a non-cancelable context via context.WithoutCancel(ctx) and
call attempt.Close(newCtx) for each collected attempt outside the lock, then
reacquire attemptsMu to remove/clear those entries from openAttempts (or delete
per dagRunID) while logging failures using the same logger and tags; this
ensures attempt.Close does I/O without blocking attemptsMu.

In @internal/service/coordinator/log_handler.go:
- Around line 25-28: The writers map currently keys by streamKey() which is
built from raw chunk fields and can differ from the sanitized filesystem path
returned by logFilePath(), causing multiple logWriter/os.File instances to be
opened for the same actual file; change the map key to the resolved sanitized
path (call logFilePath(...) to compute the filename once) and use that sanitized
path as the unique key when looking up/creating entries in writers (and when
closing/removing entries), update all places that reference streamKey() for
writer lookup/creation to instead compute and use the sanitized path, and ensure
logWriter creation is atomic under writersMu to avoid races.
- Around line 95-142: handleStream currently leaves per-stream writers open when
the RPC ends with io.EOF or any non-EOF error; update it to track which writers
were touched during the loop (e.g., a map[string]struct{} or slice of keys) and
ensure all those writers are closed when the handler returns (both on io.EOF
after SendAndClose and on any other error path). Add a helper
closeWriterByKey(key string) that acquires the writers map lock, removes the
writer from the map, releases the lock, and then closes the writer outside the
lock; replace direct calls to closeWriter(ctx, chunk) with using that
removal/close helper where appropriate and call it for every touched writer
before returning from handleStream. Ensure getOrCreateWriter, closeWriterByKey
and writer.write function names are used so you update the correct places.

In @internal/service/worker/poller_test.go:
- Around line 450-472: The mockCoordinatorCli.StreamLogs method currently
returns (nil, nil) which can cause nil-pointer panics when tests exercise
Send/CloseAndRecv; update mockCoordinatorCli.StreamLogs to return a non-nil
error (e.g., gRPC codes.Unimplemented wrapped via status.Error) or return a
minimal fake implementing coordinatorv1.CoordinatorService_StreamLogsClient so
callers get a valid client; locate the StreamLogs method on mockCoordinatorCli
and replace the nil return with either a status.Error(codes.Unimplemented,
"...") or an instance of a simple mock stream client that implements
Send/CloseAndRecv.

In @internal/service/worker/remote_handler.go:
- Around line 27-60: NewRemoteTaskHandler currently constructs a
remoteTaskHandler without validating required dependencies, but loadDAG (and
code paths in Handle) assume h.config and h.config.Paths (and other fields) are
non-nil which can cause a nil deref at runtime; update NewRemoteTaskHandler to
validate required fields from RemoteTaskHandlerConfig (at minimum cfg.Config and
cfg.Config.Paths, plus any other non-optional deps like CoordinatorClient,
DAGStore, DAGRunMgr) and return an error if missing, or alternatively add
upfront nil-guards in Handle and loadDAG that return clear errors instead of
panicking; ensure you reference RemoteTaskHandlerConfig, NewRemoteTaskHandler,
remoteTaskHandler, loadDAG, Handle, h.config and h.config.Paths when making the
checks so callers receive a meaningful error rather than a runtime panic.
- Around line 187-205: The temp DAG creation can fail if task.Target contains
path separators because fileutil.CreateTempDAGFile forwards the name to
os.CreateTemp; sanitize the name by using filepath.Base (or another
DAG-name-only value) before passing to fileutil.CreateTempDAGFile (i.e. call
fileutil.CreateTempDAGFile("worker-dags", filepath.Base(task.Target),
[]byte(task.Definition))); keep the rest of the logic (assigning tempFile to
target and cleanupFunc) unchanged and ensure any references to task.Target later
still use the original when no temp file was created.
- Around line 116-141: The fallback lookup in handleRetry uses
h.dagRunStore.FindAttempt with execution.NewDAGRunRef(task.RootDagRunName,
task.DagRunId) which fails for sub-DAG retries; update handleRetry to check if
task.DagRunId != task.RootDagRunId and in that case call
h.dagRunStore.FindSubAttempt(ctx, root, task.DagRunId) (using the existing root
:= execution.DAGRunRef{Name: task.RootDagRunName, ID: task.RootDagRunId})
otherwise fall back to FindAttempt/execution.NewDAGRunRef as before, and
preserve the existing error wrapping/messages for failed lookups/ReadStatus.

In @internal/service/worker/worker_test.go:
- Around line 380-400: TestWorkerDefaultID currently doesn't verify generated ID
because Stop() without Start() doesn't exercise ID usage; update the test to
assert the worker id directly by either (A) moving the test into package worker
and asserting the unexported field (create the worker via worker.NewWorker and
assert w.id != ""), or (B) add a small exported accessor Worker.ID() on the
worker type and in TestWorkerDefaultID call w.ID() and require it is not empty;
alternatively ensure you call w.Start(...) before Stop() so the ID is exercised
at runtime. Use the symbols worker.NewWorker, TestWorkerDefaultID, w.id or
Worker.ID(), and Start/Stop as appropriate.

In @internal/service/worker/worker.go:
- Around line 31-39: Start/Stop ordering race: Stop can run before Start
initializes w.stopCancel/w.stopDone; initialize the shutdown primitives in
NewWorker so Stop is safe anytime. In NewWorker create the internal context and
cancel (context.WithCancel) and allocate stopDone channel on the worker struct
(and keep using stopOnce in Stop) so Start only launches goroutines and Stop can
always call cancel/close without nil checks; reference the worker fields
stopCancel, stopDone, stopOnce and the constructors NewWorker/Start/Stop to
locate and update initialization.
- Around line 118-147: The cleanup call at the end of Worker.Stop should not use
the possibly-canceled caller ctx; before calling w.coordinatorCli.Cleanup(ctx)
create a fresh timeout context (e.g., cleanupCtx, cancel :=
context.WithTimeout(context.Background(), cleanupTimeout) with defer cancel())
and pass cleanupCtx to w.coordinatorCli.Cleanup so cleanup runs even if the
original ctx was canceled; keep the same error wrapping into err on failure.
Ensure you reference Worker.Stop, w.stopCancel, w.stopDone and
w.coordinatorCli.Cleanup when making the change.

In @proto/coordinator/v1/coordinator.proto:
- Around line 19-31: DAGRunStatusProto has mixed timestamp types (created_at is
int64 while queued_at/started_at/finished_at are string); pick a single
canonical representation (prefer google.protobuf.Timestamp or consistently int64
unix ms) and update the proto fields (DAGRunStatusProto.created_at, queued_at,
started_at, finished_at) to that type, then update the conversion helper
DAGRunStatusToProto (and any inverse conversions) to emit the chosen format
consistently; ensure import of google/protobuf/timestamp.proto if you choose
Timestamp and adjust any RPC messages that reference these fields accordingly.
🧹 Nitpick comments (19)
internal/runtime/runner_helper_test.go (1)

336-336: Consider using require.NotNil for consistency.

The nodeByName method still uses t.Fatalf for the same nil check pattern that was updated to require.NotNil in assertNodeStatus (line 312). For consistency with the updated approach and coding guidelines, consider applying the same pattern here.

♻️ Suggested refactor for consistency
-	t.Fatalf("step %s not found", stepName)
-	return nil
+	require.NotNil(t, nil, "step %s not found", stepName)
+	return nil

Note: While the return nil is technically unreachable after a fatal assertion, it may be kept for compiler satisfaction or removed depending on project style.

internal/service/scheduler/queue_processor.go (1)

432-440: Retry error log message is misleading (it’s monitoring retries, not execution retries).

This line logs "Failed to execute DAG after retries", but the retry loop is around monitorStartup, not ExecuteDAG. Consider renaming to avoid confusion during incident/debug.

Also, the closure reuses the outer err variable; not wrong, but easy to misread later.

Proposed tweak
-	operation := func(ctx context.Context) error {
-		started, err = p.monitorStartup(ctx, queueName, runRef)
-		return err
-	}
+	operation := func(ctx context.Context) error {
+		var monitorErr error
+		started, monitorErr = p.monitorStartup(ctx, queueName, runRef)
+		return monitorErr
+	}

 	if err := backoff.Retry(ctx, operation, policy, nil); err != nil {
-		logger.Error(ctx, "Failed to execute DAG after retries", tag.Error(err))
+		logger.Error(ctx, "Failed to confirm DAG startup after retries", tag.Error(err))
 	}
internal/cmd/startall.go (1)

198-200: Consider handling the error from coordHandler.Close.

coordHandler.Close likely returns an error (similar to other Close methods in the codebase), but it's currently being discarded. For consistency with other service cleanup patterns in this function (lines 193-196, 203-205), consider logging any error.

Proposed fix
 	// Clean up coordinator handler resources
 	coordHandler.WaitZombieDetector()
-	coordHandler.Close(ctx)
+	if err := coordHandler.Close(ctx); err != nil {
+		logger.Error(ctx, "Failed to close coordinator handler", tag.Error(err))
+	}
internal/cmd/worker_test.go (1)

54-189: Comprehensive test coverage for BuildCoordinatorClientConfig.

The test suite covers key scenarios: empty/nil coordinators, insecure mode, TLS validation failures, valid TLS config, SkipTLSVerify, and multiple coordinators. Each test uses t.Parallel() appropriately.

Consider adding an assertion in StaticCoordinatorsReturnsConfig or MultipleCoordinators to verify that the coordinator addresses are correctly passed through to the result config, if the CoordinatorClientConfig struct contains an address field.

internal/service/coordinator/log_handler.go (1)

195-205: Don’t hold writersMu while doing file I/O in w.close().
Flush/Sync/Close can block and will stall all streams because writersMu is held. Prefer: remove from map under lock, then close outside lock.

Also applies to: 249-259

internal/service/worker/worker_test.go (2)

573-650: Set PollFunc before Start() to avoid race/flakes.
Right now the worker can begin polling before SetPollFunc is installed (Sleep-based ordering).


712-799: Cancellation tests are timing-coupled to heartbeat frequency.
If heartbeat interval changes, these can become flaky. Prefer require.Eventually on “cancellation observed” conditions and avoid fixed time.Sleep(100ms) where possible.

internal/cmd/worker.go (1)

127-147: BuildCoordinatorClientConfig should defensively handle cfg == nil.
It’s a “pure function” intended for unit tests; guarding nil removes footguns.

internal/integration/distributed_parallel_test.go (1)

329-340: Cancellation assertion may hide real failures by continue on findErr.
If FindSubAttempt errors, continuing can turn infra issues into “no canceled subruns found”. Consider require.NoError (or at least t.Logf) on unexpected findErr.

internal/service/coordinator/log_handler_test.go (1)

427-470: ConcurrentAccess should assert all goroutines succeeded (and use WaitGroup).
Right now it reads 10 values from done but doesn’t fail the test if any goroutine wrote false.

internal/runtime/executor/dag_runner.go (1)

67-73: Non-deterministic ordering of extra YAML documents.

The iteration order over rCtx.DAG.LocalDAGs (a map) is non-deterministic. If the order of documents in the combined YAML file matters for parsing or if local DAGs reference each other, this could lead to flaky behavior.

Consider sorting the keys before iteration if document order is significant:

🔧 Suggested fix for deterministic ordering
+import "slices"
+
 // Collect extra docs from other local DAGs
-var extraDocs [][]byte
-for _, otherDAG := range rCtx.DAG.LocalDAGs {
-	if otherDAG.Name != childName {
-		extraDocs = append(extraDocs, otherDAG.YamlData)
-	}
-}
+var extraDocs [][]byte
+dagNames := make([]string, 0, len(rCtx.DAG.LocalDAGs))
+for name := range rCtx.DAG.LocalDAGs {
+	if name != childName {
+		dagNames = append(dagNames, name)
+	}
+}
+slices.Sort(dagNames)
+for _, name := range dagNames {
+	extraDocs = append(extraDocs, rCtx.DAG.LocalDAGs[name].YamlData)
+}
internal/service/coordinator/client.go (1)

497-539: Consider adding the failed client to removal on connection error.

The StreamLogs method has its own failover logic (appropriate since streaming can't use attemptCall), but unlike GetWorkers (line 434-436), it doesn't remove failed clients from the cache when there's an Unavailable error. This could leave stale connections in the cache.

🔧 Suggested fix
 	for _, member := range members {
 		client, err := cli.getOrCreateClient(member)
 		if err != nil {
 			cli.recordFailure(err)
+			cli.removeClient(member.ID)
 			lastErr = err
 			continue
 		}
internal/runtime/remote/log_streamer.go (1)

152-202: Consider documenting that Close creates a new stream if none existed.

On line 171-184, if w.stream is nil (no data was ever written), the final marker is not sent. This is likely intentional (nothing to finalize), but worth documenting in the method comment.

internal/service/worker/worker.go (1)

160-188: Avoid calling cancel funcs (and logging) while holding pollersMu.
context.CancelFunc is non-blocking today, but keeping this pattern lock-free prevents future deadlocks if cancellation plumbing changes (or if other work is added).

Proposed refactor
 func (w *Worker) processCancellations(ctx context.Context, cancelledRunIDs []string) {
-	w.pollersMu.Lock()
-	defer w.pollersMu.Unlock()
-
-	for _, dagRunID := range cancelledRunIDs {
-		if cancelFunc, exists := w.cancelFuncs[dagRunID]; exists {
-			logger.Info(ctx, "Cancelling task per coordinator directive",
-				tag.RunID(dagRunID),
-				tag.WorkerID(w.id))
-			cancelFunc()
-		}
-	}
+	var toCancel []struct {
+		id string
+		fn context.CancelFunc
+	}
+
+	w.pollersMu.Lock()
+	for _, dagRunID := range cancelledRunIDs {
+		if cancelFunc, exists := w.cancelFuncs[dagRunID]; exists {
+			toCancel = append(toCancel, struct {
+				id string
+				fn context.CancelFunc
+			}{id: dagRunID, fn: cancelFunc})
+		}
+	}
+	w.pollersMu.Unlock()
+
+	for _, c := range toCancel {
+		logger.Info(ctx, "Cancelling task per coordinator directive",
+			tag.RunID(c.id),
+			tag.WorkerID(w.id))
+		c.fn()
+	}
 }

Also applies to: 286-299

internal/service/coordinator/static_registry.go (3)

22-49: Consider “skip invalid addresses” vs “fail fast” semantics explicitly.
Right now any single invalid entry hard-fails NewStaticRegistry even if other addresses are valid; if this is intended, it’s fine—otherwise consider collecting errors and returning the valid subset with a combined error/warn log.


52-78: parseAddress won’t support IPv6 (or any input containing multiple :).
If IPv6 coordinators are in scope, prefer net.SplitHostPort and require bracketed IPv6 ([::1]:50055), plus a “no port provided” path.


93-99: Return an empty slice instead of nil, nil for unsupported services.
This reduces nil-handling for callers while keeping the same meaning.

Proposed tweak
 func (r *StaticRegistry) GetServiceMembers(_ context.Context, name execution.ServiceName) ([]execution.HostInfo, error) {
 	if name == execution.ServiceNameCoordinator {
 		return r.coordinators, nil
 	}
 	// Return empty list for other services
-	return nil, nil
+	return []execution.HostInfo{}, nil
 }
internal/service/worker/poller_test.go (1)

23-370: Parallel + time.Sleep makes these tests prone to flakes; prefer require.Eventually or channel sync.
This is especially relevant now that the suite is explicitly parallelized. (As per coding guidelines on Go tests, cover failure paths without timing brittleness.)

internal/service/coordinator/handler_test.go (1)

20-185: Prefer shared fixtures over duplicating DAGRunStore mocks.

The inline mockDAGRunStore/mockDAGRunAttempt are sizable and will likely be reused across packages as shared-nothing coverage grows; consider moving them under internal/test (or using an existing fixture) to avoid divergence. Based on learnings, tests in **/*_test.go should favor shared fixtures from internal/test over duplicating mocks.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04b73fb and 053e670.

⛔ Files ignored due to path filters (2)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (57)
  • codecov.yml
  • internal/cmd/context.go
  • internal/cmd/coordinator.go
  • internal/cmd/flags.go
  • internal/cmd/startall.go
  • internal/cmd/worker.go
  • internal/cmd/worker_test.go
  • internal/common/config/config.go
  • internal/common/config/definition.go
  • internal/common/config/loader.go
  • internal/common/config/loader_test.go
  • internal/common/fileutil/fileutil.go
  • internal/common/fileutil/fileutil_test.go
  • internal/core/execution/context.go
  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/integration/distributed_helpers_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/integration/distributed_subdag_test.go
  • internal/integration/distributed_test.go
  • internal/integration/remote_test.go
  • internal/proto/convert/status.go
  • internal/proto/convert/status_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/context.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/runtime/executor/task.go
  • internal/runtime/executor/task_test.go
  • internal/runtime/manager.go
  • internal/runtime/output.go
  • internal/runtime/output_test.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/remote/status_pusher.go
  • internal/runtime/runner.go
  • internal/runtime/runner_helper_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/client_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/coordinator/log_handler.go
  • internal/service/coordinator/log_handler_test.go
  • internal/service/coordinator/service.go
  • internal/service/coordinator/static_registry.go
  • internal/service/coordinator/static_registry_test.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/worker/handler.go
  • internal/service/worker/handler_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/worker.go
  • internal/service/worker/worker_test.go
  • internal/test/coordinator.go
  • internal/test/helper.go
  • proto/coordinator/v1/coordinator.proto
✅ Files skipped from review due to trivial changes (1)
  • internal/test/coordinator.go
🚧 Files skipped from review as they are similar to previous changes (9)
  • internal/service/worker/handler_test.go
  • internal/common/config/loader.go
  • internal/common/config/definition.go
  • internal/integration/distributed_subdag_test.go
  • internal/cmd/flags.go
  • internal/integration/distributed_test.go
  • internal/common/config/config.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/service/worker/handler.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/cmd/worker.go
  • internal/common/fileutil/fileutil_test.go
  • internal/service/coordinator/log_handler_test.go
  • internal/cmd/context.go
  • internal/runtime/output.go
  • internal/runtime/manager.go
  • internal/service/coordinator/static_registry.go
  • internal/service/worker/worker.go
  • internal/runtime/runner.go
  • internal/service/coordinator/service.go
  • internal/core/execution/context.go
  • internal/service/coordinator/static_registry_test.go
  • internal/runtime/output_test.go
  • internal/test/helper.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/common/fileutil/fileutil.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/remote/status_pusher.go
  • internal/runtime/context.go
  • internal/service/coordinator/log_handler.go
  • internal/service/scheduler/dag_executor.go
  • internal/integration/distributed_helpers_test.go
  • internal/common/config/loader_test.go
  • internal/cmd/startall.go
  • internal/service/worker/worker_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/proto/convert/status.go
  • internal/runtime/agent/agent.go
  • internal/service/coordinator/client.go
  • internal/cmd/worker_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/proto/convert/status_test.go
  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/cmd/coordinator.go
  • internal/runtime/runner_helper_test.go
  • internal/runtime/executor/task_test.go
  • internal/service/coordinator/client_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/integration/remote_test.go
  • internal/runtime/executor/task.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/common/fileutil/fileutil_test.go
  • internal/service/coordinator/log_handler_test.go
  • internal/service/coordinator/static_registry_test.go
  • internal/runtime/output_test.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/integration/distributed_helpers_test.go
  • internal/common/config/loader_test.go
  • internal/service/worker/worker_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/cmd/worker_test.go
  • internal/service/worker/poller_test.go
  • internal/proto/convert/status_test.go
  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/runtime/runner_helper_test.go
  • internal/runtime/executor/task_test.go
  • internal/service/coordinator/client_test.go
  • internal/integration/remote_test.go
  • internal/service/coordinator/handler_test.go
🧠 Learnings (6)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/cmd/worker.go
  • internal/cmd/context.go
  • internal/runtime/executor/dag_runner.go
  • internal/service/worker/poller_test.go
  • codecov.yml
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths

Applied to files:

  • internal/common/fileutil/fileutil_test.go
  • internal/service/coordinator/log_handler_test.go
  • internal/service/coordinator/static_registry_test.go
  • internal/runtime/output_test.go
  • internal/test/helper.go
  • internal/common/config/loader_test.go
  • internal/integration/distributed_parallel_test.go
  • internal/proto/convert/status_test.go
  • internal/integration/distributed_coordinator_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/service/coordinator/client_test.go
  • internal/integration/remote_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages

Applied to files:

  • internal/common/fileutil/fileutil_test.go
  • internal/runtime/output_test.go
  • internal/proto/convert/status_test.go
  • internal/integration/remote_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks

Applied to files:

  • internal/common/fileutil/fileutil_test.go
  • internal/runtime/output_test.go
  • internal/cmd/worker_test.go
  • internal/integration/distributed_e2e_test.go
  • internal/runtime/runner_helper_test.go
  • internal/service/coordinator/handler_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`

Applied to files:

  • internal/runtime/executor/dag_runner.go
  • codecov.yml
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to {api/v1,api/v2}/**/*.{proto,yaml,yml,json} : API definitions live in `api/v1` and `api/v2`; generated server stubs land in `internal/service`, while matching TypeScript clients flow into `ui/src/api`

Applied to files:

  • codecov.yml
🧬 Code graph analysis (28)
internal/cmd/worker.go (3)
internal/common/config/config.go (2)
  • Peer (306-321)
  • Worker (279-284)
internal/service/coordinator/client.go (2)
  • Client (29-55)
  • New (94-103)
internal/service/coordinator/static_registry.go (1)
  • NewStaticRegistry (22-49)
internal/common/fileutil/fileutil_test.go (1)
internal/common/fileutil/fileutil.go (3)
  • OpenOrCreateFile (62-70)
  • CreateTempDAGFile (200-250)
  • FileExists (44-47)
internal/runtime/manager.go (6)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
internal/core/execution/dagrun.go (1)
  • NewDAGRunRef (135-140)
internal/common/logger/tag/tag.go (2)
  • Name (271-273)
  • Error (20-22)
internal/common/logger/context.go (4)
  • Errorf (75-77)
  • Info (40-42)
  • Debug (35-37)
  • Error (50-52)
internal/core/dag.go (1)
  • SockAddr (611-637)
internal/common/fileutil/fileutil.go (1)
  • FileExists (44-47)
internal/runtime/runner.go (3)
internal/core/status.go (4)
  • Status (4-4)
  • NodeRunning (66-66)
  • NodeAborted (68-68)
  • NodeSucceeded (69-69)
internal/core/step.go (3)
  • Step (13-103)
  • RepeatPolicy (239-255)
  • RepeatMode (229-229)
internal/core/execution/node.go (1)
  • Node (9-36)
internal/core/execution/context.go (3)
internal/runtime/context.go (4)
  • LogWriterFactory (46-46)
  • Context (14-14)
  • WithLogWriterFactory (42-42)
  • ContextOption (22-22)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
proto/coordinator/v1/coordinator.pb.go (3)
  • GetDAGRunStatusResponse (1899-1906)
  • GetDAGRunStatusResponse (1919-1919)
  • GetDAGRunStatusResponse (1934-1936)
internal/service/coordinator/static_registry_test.go (1)
internal/service/coordinator/static_registry.go (1)
  • NewStaticRegistry (22-49)
internal/runtime/output_test.go (3)
internal/runtime/data.go (3)
  • Parallel (85-89)
  • NodeData (24-27)
  • NodeState (29-81)
internal/runtime/output.go (1)
  • OutputCoordinator (22-49)
internal/core/execution/context.go (2)
  • Context (17-29)
  • StreamTypeStdout (44-44)
internal/test/helper.go (2)
internal/runtime/agent/agent.go (1)
  • Options (158-189)
internal/persistence/filedag/store.go (1)
  • Options (31-36)
internal/runtime/executor/dag_runner_test.go (1)
internal/runtime/data.go (1)
  • Parallel (85-89)
internal/runtime/executor/dag_runner.go (3)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-250)
internal/core/execution/context.go (2)
  • Context (17-29)
  • RunStatus (126-137)
internal/proto/convert/status.go (1)
  • ProtoToDAGRunStatus (114-153)
internal/runtime/remote/log_streamer.go (5)
internal/service/coordinator/client.go (1)
  • Client (29-55)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/core/execution/context.go (3)
  • Context (17-29)
  • StreamTypeStdout (44-44)
  • StreamTypeStderr (46-46)
proto/coordinator/v1/coordinator.pb.go (10)
  • LogChunk (1640-1657)
  • LogChunk (1670-1670)
  • LogChunk (1685-1687)
  • LogStreamType (127-127)
  • LogStreamType (159-161)
  • LogStreamType (163-165)
  • LogStreamType (172-174)
  • LogStreamType_LOG_STREAM_TYPE_STDOUT (131-131)
  • LogStreamType_LOG_STREAM_TYPE_STDERR (132-132)
  • LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED (130-130)
api/v2/api.gen.go (1)
  • StepName (1342-1342)
internal/runtime/remote/status_pusher.go (6)
internal/service/coordinator/client.go (1)
  • Client (29-55)
internal/runtime/agent/agent.go (1)
  • StatusPusher (153-155)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
proto/coordinator/v1/coordinator.pb.go (3)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
internal/proto/convert/status.go (1)
  • DAGRunStatusToProto (12-51)
internal/runtime/context.go (1)
internal/core/execution/context.go (2)
  • WithLogWriterFactory (229-233)
  • LogWriterFactory (35-39)
internal/service/scheduler/dag_executor.go (2)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/runtime/executor/task.go (1)
  • WithPreviousStatus (83-89)
internal/integration/distributed_helpers_test.go (8)
internal/test/coordinator.go (1)
  • Coordinator (20-28)
internal/service/worker/worker.go (2)
  • Worker (21-39)
  • NewWorker (47-66)
internal/service/worker/remote_handler.go (2)
  • RemoteTaskHandlerConfig (28-45)
  • NewRemoteTaskHandler (49-60)
internal/common/logger/tag/tag.go (1)
  • WorkerID (65-67)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/common/fileutil/fileutil.go (1)
  • IsDir (34-40)
internal/service/worker/worker_test.go (5)
internal/runtime/subcmd.go (1)
  • Run (258-280)
internal/service/worker/worker.go (1)
  • NewWorker (47-66)
internal/core/step.go (1)
  • RetryPolicy (211-226)
internal/common/logger/tag/tag.go (1)
  • Target (168-170)
internal/common/logger/context.go (1)
  • Fatal (55-57)
internal/integration/distributed_parallel_test.go (4)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (2)
  • WithStatusPersistence (84-88)
  • WithDAGsDir (63-67)
internal/core/spec/loader.go (1)
  • WithDAGsDir (90-94)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/proto/convert/status.go (5)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/params.go (1)
  • Params (36-45)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/core/step.go (1)
  • ExecutorConfig (195-203)
internal/common/collections/syncmap.go (1)
  • SyncMap (10-14)
internal/service/worker/poller_test.go (3)
proto/coordinator/v1/coordinator.pb.go (15)
  • PollRequest (177-184)
  • PollRequest (197-197)
  • PollRequest (212-214)
  • Task (365-384)
  • Task (397-397)
  • Task (412-414)
  • HeartbeatRequest (700-707)
  • HeartbeatRequest (720-720)
  • HeartbeatRequest (735-737)
  • HeartbeatResponse (761-768)
  • HeartbeatResponse (781-781)
  • HeartbeatResponse (796-798)
  • GetDAGRunStatusResponse (1899-1906)
  • GetDAGRunStatusResponse (1919-1919)
  • GetDAGRunStatusResponse (1934-1936)
internal/service/coordinator/client.go (1)
  • Metrics (58-63)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/service/worker/remote_handler.go (8)
internal/service/worker/handler.go (1)
  • TaskHandler (18-20)
internal/core/execution/dagrun.go (2)
  • DAGRunStore (23-48)
  • DAGRunRef (128-131)
proto/coordinator/v1/coordinator.pb.go (7)
  • Task (365-384)
  • Task (397-397)
  • Task (412-414)
  • Operation (24-24)
  • Operation (56-58)
  • Operation (60-62)
  • Operation (69-71)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/runtime/remote/status_pusher.go (2)
  • StatusPusher (14-17)
  • NewStatusPusher (20-25)
internal/runtime/remote/log_streamer.go (2)
  • LogStreamer (21-29)
  • NewLogStreamer (32-48)
internal/common/fileutil/fileutil.go (1)
  • CreateTempDAGFile (200-250)
internal/core/spec/loader.go (3)
  • LoadOption (41-41)
  • WithBaseConfig (44-48)
  • Load (150-171)
internal/integration/distributed_coordinator_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/integration/distributed_e2e_test.go (2)
internal/test/coordinator.go (1)
  • SetupCoordinator (31-96)
internal/test/helper.go (1)
  • WithStatusPersistence (84-88)
internal/cmd/coordinator.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunStore (23-48)
internal/service/coordinator/service.go (2)
  • Service (21-34)
  • NewService (36-57)
internal/service/coordinator/handler.go (3)
  • Handler (39-58)
  • WithDAGRunStore (64-68)
  • WithLogDir (71-75)
internal/runtime/executor/task_test.go (4)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/status.go (3)
  • Running (8-8)
  • NodeSucceeded (69-69)
  • NodeFailed (67-67)
internal/core/execution/node.go (1)
  • Node (9-36)
internal/runtime/executor/task.go (1)
  • WithPreviousStatus (83-89)
internal/service/coordinator/client_test.go (1)
proto/coordinator/v1/coordinator.pb.go (9)
  • ReportStatusRequest (962-968)
  • ReportStatusRequest (981-981)
  • ReportStatusRequest (996-998)
  • ReportStatusResponse (1015-1021)
  • ReportStatusResponse (1034-1034)
  • ReportStatusResponse (1049-1051)
  • DAGRunStatusProto (1068-1096)
  • DAGRunStatusProto (1109-1109)
  • DAGRunStatusProto (1124-1126)
internal/service/scheduler/queue_processor.go (3)
proto/coordinator/v1/coordinator.pb.go (1)
  • Operation_OPERATION_RETRY (29-29)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (128-131)
internal/runtime/executor/task.go (3)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
proto/coordinator/v1/coordinator.pb.go (3)
  • Task (365-384)
  • Task (397-397)
  • Task (412-414)
internal/proto/convert/status.go (1)
  • DAGRunStatusToProto (12-51)
internal/service/coordinator/handler_test.go (3)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/core/status.go (6)
  • Running (8-8)
  • NodeRunning (66-66)
  • NodeSucceeded (69-69)
  • Failed (9-9)
  • NodeFailed (67-67)
  • Succeeded (11-11)

Comment thread internal/cmd/worker.go
Comment thread internal/runtime/output.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/worker/remote_handler.go
Comment thread internal/service/worker/worker_test.go
Comment thread internal/service/worker/worker.go
Comment thread internal/service/worker/worker.go
Comment thread proto/coordinator/v1/coordinator.proto
@yohamta0 yohamta0 merged commit ed2da6b into main Jan 11, 2026
5 checks passed
@yohamta0 yohamta0 deleted the experimental-worker-sync branch January 11, 2026 07:37
@codecov

codecov Bot commented Jan 11, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 74.54545% with 322 lines in your changes missing coverage. Please review.
✅ Project coverage is 66.32%. Comparing base (d7af927) to head (d6283d8).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/coordinator/handler.go 71.25% 50 Missing and 23 partials ⚠️
internal/service/coordinator/client.go 34.17% 49 Missing and 3 partials ⚠️
internal/runtime/executor/dag_runner.go 5.55% 50 Missing and 1 partial ⚠️
internal/service/coordinator/log_handler.go 76.98% 19 Missing and 10 partials ⚠️
internal/cmd/worker.go 39.47% 23 Missing ⚠️
internal/runtime/agent/agent.go 63.63% 11 Missing and 5 partials ⚠️
internal/common/fileutil/fileutil.go 46.42% 8 Missing and 7 partials ⚠️
internal/runtime/manager.go 0.00% 14 Missing ⚠️
internal/service/coordinator/service.go 0.00% 12 Missing ⚠️
internal/service/worker/remote_handler.go 94.63% 5 Missing and 3 partials ⚠️
... and 7 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1564      +/-   ##
==========================================
+ Coverage   64.46%   66.32%   +1.85%     
==========================================
  Files         241      245       +4     
  Lines       26586    27139     +553     
==========================================
+ Hits        17139    18000     +861     
+ Misses       7873     7528     -345     
- Partials     1574     1611      +37     
Files with missing lines Coverage Δ
internal/cmd/context.go 72.08% <100.00%> (ø)
internal/cmd/flags.go 100.00% <ø> (ø)
internal/cmd/startall.go 72.80% <100.00%> (+0.44%) ⬆️
internal/common/config/config.go 85.18% <ø> (ø)
internal/proto/convert/status.go 100.00% <100.00%> (ø)
internal/runtime/context.go 50.00% <ø> (ø)
internal/runtime/executor/task.go 100.00% <100.00%> (ø)
internal/runtime/remote/log_streamer.go 100.00% <100.00%> (ø)
internal/runtime/remote/status_pusher.go 100.00% <100.00%> (ø)
internal/service/coordinator/static_registry.go 100.00% <100.00%> (ø)
... and 19 more

... and 4 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d7af927...d6283d8. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant