feat: shared-nothing worker#1564
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughAdds shared-nothing distributed execution: workers can statically discover coordinators, push DAG run status, and stream per-step logs via new gRPC RPCs; coordinator gains persistence, zombie detection, and log handling; runtime and worker code wire status/log pushers and remote task handlers; extensive proto, conversion, and tests added. Changes
Sequence Diagram(s)sequenceDiagram
participant Scheduler
participant Worker
participant RemoteHandler
participant Coordinator
participant DAGRunStore
Scheduler->>Worker: Dispatch Task (may include Definition)
Worker->>RemoteHandler: Handle(ctx, task)
RemoteHandler->>RemoteHandler: loadDAG (Definition → temp file) / create handlers
RemoteHandler->>Coordinator: ReportStatus(initial)
RemoteHandler->>Coordinator: StreamLogs (Log chunks during steps)
RemoteHandler->>DAGRunStore: Persist final status (if configured)
RemoteHandler->>Coordinator: ReportStatus(final)
RemoteHandler-->>Scheduler: Task completion
sequenceDiagram
participant Worker
participant Coordinator
participant HandlerStore as DAGRunStore
loop Heartbeat interval
Worker->>Coordinator: Heartbeat(req)
Coordinator->>HandlerStore: detect stale heartbeats / mark failures
Coordinator-->>Worker: HeartbeatResponse{cancelled_runs}
Worker->>Worker: processCancellations(cancelled_runs) -> cancel running tasks
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/runtime/executor/dag_runner_test.go (1)
18-72: Avoid potentially vacuous assertions afterCleanup(capture the path before cleanup).If
SubDAGExecutor.Cleanupclearsexecutor.tempFile,assert.NoFileExists(t, executor.tempFile)can become a weak check (empty string). Capture the path first and (optionally) assert the field is cleared.Proposed tweak
// Verify the temp file was created assert.FileExists(t, executor.tempFile) + tempPath := executor.tempFile // Read and verify the content - content, err := os.ReadFile(executor.tempFile) + content, err := os.ReadFile(tempPath) require.NoError(t, err) assert.Equal(t, parentDAG.LocalDAGs["local-child"].YamlData, content) // Cleanup err = executor.Cleanup(ctx) assert.NoError(t, err) - assert.NoFileExists(t, executor.tempFile) + assert.NoFileExists(t, tempPath) + assert.Empty(t, executor.tempFile)
🤖 Fix all issues with AI agents
In @internal/common/fileutil/fileutil_test.go:
- Around line 229-296: The test misses verifying that YAML document separators
are on their own line, which hides a bug in CreateTempDAGFile where it appends
"---\n" directly after the primary doc; update CreateTempDAGFile to ensure it
writes a separator with a preceding newline (e.g., use "\n---\n" or ensure the
primary doc ends with '\n' before appending) and update the
TestCreateTempDAGFile cases to assert the separator appears as "\n---\n" (or
that the separator is on its own line) and add an assertion that the primary doc
terminates with a newline when extra docs are provided.
In @internal/runtime/executor/dag_runner.go:
- Around line 493-511: extractOutputsFromNodes currently drops any
OutputVariables values without an "=" silently; fix this by normalizing or
validating at the proto conversion boundary: update ProtoToNode to ensure
OutputVariables values are written using the same stringutil.NewKeyValue(...)
format (the same normalization used by setVariable/setBoolVariable in data.go)
so proto-roundtripped nodes always store "key=value", or alternatively add a
defensive normalization in extractOutputsFromNodes to handle values missing "="
(e.g., treat the whole string as key with empty value or log/warn and skip
explicitly). Reference: extractOutputsFromNodes, ProtoToNode, setVariable,
setBoolVariable, stringutil.NewKeyValue.
In @internal/service/coordinator/handler.go:
- Around line 232-256: Heartbeat currently calls detectAndCleanupZombies on
every invocation which causes O(workers²) work under high scale; remove the
direct call to detectAndCleanupZombies from the Heartbeat method and rely on the
existing periodic detector started by StartZombieDetector, or implement simple
throttling inside Heartbeat by tracking a timestamp field (e.g.,
h.lastZombieCleanupAt) and only invoking detectAndCleanupZombies when
time.Since(h.lastZombieCleanupAt) >= h.zombieDetectionInterval, updating
h.lastZombieCleanupAt under the mutex; update references to
detectAndCleanupZombies, Heartbeat, StartZombieDetector, and add the new
timestamp/interval fields on the Handler struct if choosing the throttling
approach.
In @internal/service/coordinator/service.go:
- Around line 120-122: The Stop() method on the coordinator currently calls
srv.handler.Close(ctx) but does not wait for the zombie detector goroutine to
finish; add a call to the coordinator's WaitZombieDetector() (or equivalent
method) before or after srv.handler.Close(ctx) so the zombie detector is awaited
during shutdown. Locate the Stop() method in service.go and ensure it invokes
WaitZombieDetector() (the same coordination used in startall.go) to block until
the detector exits, preserving existing Close(ctx) behavior and ordering
consistent with startall.go.
- Around line 60-62: Replace the hardcoded 45-second interval passed to
srv.handler.StartZombieDetector with the configured value from the scheduler
configuration (e.g., use the Scheduler.ZombieDetectionInterval field from the
coordinator/service configuration accessible on the service instance), ensuring
you convert it to time.Duration if necessary; update the call site in service.go
where srv.handler.StartZombieDetector(ctx, 45*time.Second) is invoked to use the
config value instead (for example: srv.cfg.Scheduler.ZombieDetectionInterval or
the equivalent config field on srv) so the Coordinator respects the same
configuration as the Scheduler.
🧹 Nitpick comments (17)
internal/runtime/runner_helper_test.go (1)
316-338: Consider applying the same testify pattern for consistency.The
nodeByNamefunction at line 336 still usest.Fatalffor the same not-found check. For consistency with the change on line 312, consider replacing it withrequire.NotNil.♻️ Suggested consistency improvement
if rr.cfg.OnCancel != nil && rr.cfg.OnCancel.Name == stepName { return rr.runner.HandlerNode(core.HandlerOnCancel) } - t.Fatalf("step %s not found", stepName) - return nil + require.FailNow(t, "step %s not found", stepName) + return nil // unreachable but required for compilationNote:
require.FailNowis used here since the function must return a value. Alternatively, restructure to eliminate the need for a return after the assertion.internal/runtime/manager.go (1)
128-139: Consider logging socket stop failures for debugging.The logic to guard socket communication behind the
alivecheck is sound, and the fallback to the abort flag mechanism is a good approach for handling both local and distributed scenarios. However, when the socket request fails on line 134, the error is silently discarded. Logging this failure (at debug level) would help diagnose cases where socket stop is expected to work but doesn't.📝 Suggested change
if alive { addr := dag.SockAddr(dagRunID) if fileutil.FileExists(addr) { // In case the socket exists, we try to send a stop request client := sock.NewClient(addr) - if _, err := client.Request("POST", "/stop"); err == nil { + if _, err := client.Request("POST", "/stop"); err != nil { + logger.Debug(ctx, "Socket stop failed, falling back to abort flag", tag.Error(err)) + } else { logger.Info(ctx, "Successfully stopped DAG via socket") return nil } } }internal/runtime/executor/dag_runner_test.go (1)
269-299: Uset.TempDir()instead of a fixedos.TempDir()path (stronger parallel safety).Even if it’s currently unique within the package, hardcoding
{os.TempDir()}/dagu-test/test.yamlis easy to collide with in future additions.Proposed tweak
- // Create a temporary file - tempDir := filepath.Join(os.TempDir(), "dagu-test") - err := os.MkdirAll(tempDir, 0750) - require.NoError(t, err) - defer func() { _ = os.RemoveAll(tempDir) }() - - tempFile := filepath.Join(tempDir, "test.yaml") - err = os.WriteFile(tempFile, []byte("test content"), 0600) + tempDir := t.TempDir() + tempFile := filepath.Join(tempDir, "test.yaml") + err := os.WriteFile(tempFile, []byte("test content"), 0600) require.NoError(t, err)internal/common/config/config.go (1)
278-284: Consider validatingWorker.Coordinatorsformat inConfig.Validate()(fail fast).Since this is user-provided via flags/env, validating entries (e.g.,
net.SplitHostPort+ port range, and rejecting schemes if you truly requirehost:port) would make misconfigurations much easier to diagnose.internal/service/coordinator/client_test.go (2)
24-31: Usenet.SplitHostPortinstead ofstrings.Splitfor host/port parsing (more robust).This avoids subtle bugs if the listener address format changes (e.g., IPv6).
Proposed tweak
import ( "context" + "net" "net" "strconv" "strings" @@ func parsePort(addr string) int { - parts := strings.Split(addr, ":") - if len(parts) < 2 { - return 0 - } - port, _ := strconv.Atoi(parts[1]) - return port + _, portStr, err := net.SplitHostPort(addr) + if err != nil { + return 0 + } + port, _ := strconv.Atoi(portStr) + return port }And when building
execution.HostInfo, prefer:host, portStr, _ := net.SplitHostPort(addr) port, _ := strconv.Atoi(portStr)Also applies to: 141-143, 188-190
204-245: Heartbeat test: consider asserting the response is non-nil.Right now you only validate the request shape; also checking the returned
*HeartbeatResponsewould make the new signature change more meaningful.internal/common/fileutil/fileutil_test.go (1)
12-41:InvalidPathtest can be platform-dependent; parallelism may surface flakiness.Using
"/nonexistent/directory/test.log"can behave unexpectedly across OSes/containers. Consider generating a guaranteed-invalid path fromt.TempDir()(e.g., create+remove a dir, then attempt to create a file inside it).
Based on learnings, nice job keeping assertions on the failure path.internal/common/config/loader.go (1)
613-647: Consider adding address format validation.The function correctly handles multiple input formats (comma-separated string, YAML list, string slice) and properly trims whitespace. However, it doesn't validate that addresses are in valid
host:portformat. Invalid addresses would cause failures later during connection attempts.Optional: Add basic host:port validation
func parseCoordinatorAddresses(input interface{}) []string { var addresses []string + validateAddress := func(addr string) bool { + // Basic validation: must contain exactly one colon + parts := strings.Split(addr, ":") + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return false + } + return true + } + switch v := input.(type) { case string: // Parse comma-separated string: "host1:port1,host2:port2" if v != "" { parts := strings.Split(v, ",") for _, part := range parts { part = strings.TrimSpace(part) - if part != "" { + if part != "" && validateAddress(part) { addresses = append(addresses, part) } } } // ... similar changes for other casesinternal/service/coordinator/log_handler.go (2)
123-131: Consider including AttemptId in the stream key to prevent collisions during retries.The current
streamKeyusesDagName/DagRunId/StepName/StreamType, butlogFilePathalso usesAttemptIdfor the directory structure. If a DAG run is retried with the same run ID but different attempt ID, the stream key would collide while the file paths would differ, potentially causing incorrect writer reuse.♻️ Suggested fix
func (h *logHandler) streamKey(chunk *coordinatorv1.LogChunk) string { - return fmt.Sprintf("%s/%s/%s/%s", + return fmt.Sprintf("%s/%s/%s/%s/%s", chunk.DagName, chunk.DagRunId, + chunk.AttemptId, chunk.StepName, chunk.StreamType.String(), ) }
171-182: Consider propagating context from the stream for logging.Using
context.Background()loses any trace context or log correlation that might be available from the stream handling context.♻️ Suggested improvement
-func (h *logHandler) closeWriter(chunk *coordinatorv1.LogChunk) { +func (h *logHandler) closeWriter(ctx context.Context, chunk *coordinatorv1.LogChunk) { key := h.streamKey(chunk) h.writersMu.Lock() defer h.writersMu.Unlock() if w, ok := h.writers[key]; ok { - w.close(context.Background()) + w.close(ctx) delete(h.writers, key) } }Then update the call site in
handleStream:if chunk.IsFinal { - h.closeWriter(chunk) + h.closeWriter(stream.Context(), chunk) continue }internal/runtime/remote/log_streamer.go (2)
106-140: Consider handling stream initialization failure more gracefully.If
StreamLogsfails, the error is returned but subsequentWritecalls will attempt to initialize the stream again sincew.streamremains nil. This is actually reasonable behavior (retry on next write), but consider whether you want to track a "permanently failed" state to avoid repeated connection attempts for a broken stream.
160-178: Sequence number is missing on the final chunk.The final chunk doesn't include a sequence number (defaults to 0). While
IsFinal=truemarks it as the last chunk, including the correct sequence number would help with ordering verification on the receiver side.♻️ Suggested fix
// Send final marker if w.stream != nil { + w.sequence++ finalChunk := &coordinatorv1.LogChunk{ WorkerId: w.streamer.workerID, DagRunId: w.streamer.dagRunID, DagName: w.streamer.dagName, StepName: w.stepName, StreamType: toProtoStreamType(w.streamType), IsFinal: true, + Sequence: w.sequence, RootDagRunName: w.streamer.rootRef.Name, RootDagRunId: w.streamer.rootRef.ID, AttemptId: w.streamer.getAttemptID(), }internal/service/coordinator/static_registry.go (1)
22-49: Coordinator ID uses original index, not filtered index.When empty addresses are skipped, the
coord-<i>ID still uses the original loop indexi. For example, if addresses are["", "host1:50055", "host2:50055"], the IDs will becoord-1andcoord-2(skippingcoord-0). This may be intentional for debugging/correlation with the input array, but could be confusing. Consider usinglen(hosts)for sequential IDs.♻️ Optional fix for sequential IDs
for i, addr := range addresses { if addr == "" { continue } host, port, err := parseAddress(addr) if err != nil { return nil, fmt.Errorf("invalid coordinator address %q: %w", addr, err) } hosts = append(hosts, execution.HostInfo{ - ID: fmt.Sprintf("coord-%d", i), + ID: fmt.Sprintf("coord-%d", len(hosts)), Host: host, Port: port, Status: execution.ServiceStatusActive, StartedAt: time.Now(), }) }internal/service/worker/remote_handler.go (1)
232-251: Temp log directory uses fixed path pattern.The log directory is created at
/tmp/dagu/worker-logs/<dagRunID>. If multiple workers on the same host execute the same DAG run (unlikely but possible in edge cases), they would conflict. Consider including worker ID in the path.♻️ Optional: Include worker ID in temp path
func (h *remoteTaskHandler) createAgentEnv(ctx context.Context, dagRunID string) (*agentEnv, error) { - logDir := filepath.Join(os.TempDir(), "dagu", "worker-logs", dagRunID) + logDir := filepath.Join(os.TempDir(), "dagu", "worker-logs", h.workerID, dagRunID) if err := os.MkdirAll(logDir, 0750); err != nil { return nil, fmt.Errorf("failed to create log directory: %w", err) }internal/service/coordinator/client.go (1)
505-531: Missing health check before establishing stream connection.
StreamLogsbypasses the health check thatattemptCallperforms before making calls. This could result in attempting to stream logs to an unhealthy coordinator, whereas other methods verify coordinator health first.Consider adding a health check or documenting why it's intentionally skipped for streaming connections.
♻️ Suggested improvement
func (cli *clientImpl) StreamLogs(ctx context.Context) (coordinatorv1.CoordinatorService_StreamLogsClient, error) { members, err := cli.getCoordinatorMembers(ctx) if err != nil { return nil, err } // Try each coordinator until one works var lastErr error for _, member := range members { client, err := cli.getOrCreateClient(member) if err != nil { lastErr = err continue } + // Check coordinator health before establishing stream + if err := cli.isHealthy(ctx, member); err != nil { + lastErr = err + continue + } + stream, err := client.client.StreamLogs(ctx) if err != nil { lastErr = err continue } return stream, nil } return nil, fmt.Errorf("failed to create log stream: %w", lastErr) }internal/service/coordinator/handler.go (2)
87-97: Consider logging errors during Close instead of silently ignoring them.While using
_ = attempt.Close(ctx)avoids blocking shutdown, logging close errors would help diagnose issues with status persistence during shutdown.♻️ Suggested improvement
func (h *Handler) Close(ctx context.Context) { h.attemptsMu.Lock() defer h.attemptsMu.Unlock() for dagRunID, attempt := range h.openAttempts { - _ = attempt.Close(ctx) + if err := attempt.Close(ctx); err != nil { + logger.Warn(ctx, "Failed to close attempt during shutdown", + tag.RunID(dagRunID), tag.Error(err)) + } delete(h.openAttempts, dagRunID) } }
259-285: Potential performance issue with per-task database lookups.
getCancelledRunsForWorkercallsFindAttemptandIsAbortingfor each running task on every heartbeat. With many running tasks across workers, this could cause significant database load.Consider caching abort status or batching these lookups if performance becomes an issue.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
proto/coordinator/v1/coordinator.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (44)
internal/cmd/context.gointernal/cmd/coordinator.gointernal/cmd/flags.gointernal/cmd/startall.gointernal/cmd/worker.gointernal/common/config/config.gointernal/common/config/definition.gointernal/common/config/loader.gointernal/common/fileutil/fileutil.gointernal/common/fileutil/fileutil_test.gointernal/core/execution/context.gointernal/integration/distributed_helpers_test.gointernal/integration/remote_test.gointernal/proto/convert/status.gointernal/proto/convert/status_test.gointernal/runtime/agent/agent.gointernal/runtime/context.gointernal/runtime/executor/dag_runner.gointernal/runtime/executor/dag_runner_test.gointernal/runtime/executor/task.gointernal/runtime/manager.gointernal/runtime/output.gointernal/runtime/remote/log_streamer.gointernal/runtime/remote/status_pusher.gointernal/runtime/runner_helper_test.gointernal/service/coordinator/client.gointernal/service/coordinator/client_test.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/coordinator/log_handler.gointernal/service/coordinator/service.gointernal/service/coordinator/static_registry.gointernal/service/coordinator/static_registry_test.gointernal/service/scheduler/dag_executor.gointernal/service/scheduler/dag_executor_test.gointernal/service/scheduler/queue_processor.gointernal/service/worker/handler.gointernal/service/worker/handler_test.gointernal/service/worker/poller_test.gointernal/service/worker/remote_handler.gointernal/service/worker/worker.gointernal/test/coordinator.gointernal/test/helper.goproto/coordinator/v1/coordinator.proto
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/common/fileutil/fileutil.gointernal/service/worker/handler.gointernal/common/config/config.gointernal/service/coordinator/handler_test.gointernal/runtime/runner_helper_test.gointernal/service/scheduler/dag_executor_test.gointernal/cmd/context.gointernal/service/coordinator/service.gointernal/service/scheduler/dag_executor.gointernal/runtime/context.gointernal/test/coordinator.gointernal/common/fileutil/fileutil_test.gointernal/runtime/remote/status_pusher.gointernal/runtime/executor/task.gointernal/runtime/manager.gointernal/common/config/definition.gointernal/service/worker/remote_handler.gointernal/cmd/flags.gointernal/test/helper.gointernal/integration/remote_test.gointernal/service/worker/worker.gointernal/cmd/startall.gointernal/service/worker/poller_test.gointernal/service/scheduler/queue_processor.gointernal/proto/convert/status.gointernal/runtime/executor/dag_runner.gointernal/service/coordinator/client.gointernal/service/coordinator/static_registry_test.gointernal/common/config/loader.gointernal/runtime/output.gointernal/service/worker/handler_test.gointernal/core/execution/context.gointernal/cmd/worker.gointernal/runtime/agent/agent.gointernal/integration/distributed_helpers_test.gointernal/service/coordinator/client_test.gointernal/runtime/executor/dag_runner_test.gointernal/service/coordinator/static_registry.gointernal/proto/convert/status_test.gointernal/service/coordinator/handler.gointernal/runtime/remote/log_streamer.gointernal/service/coordinator/log_handler.gointernal/cmd/coordinator.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/service/coordinator/handler_test.gointernal/runtime/runner_helper_test.gointernal/service/scheduler/dag_executor_test.gointernal/common/fileutil/fileutil_test.gointernal/integration/remote_test.gointernal/service/worker/poller_test.gointernal/service/coordinator/static_registry_test.gointernal/service/worker/handler_test.gointernal/integration/distributed_helpers_test.gointernal/service/coordinator/client_test.gointernal/runtime/executor/dag_runner_test.gointernal/proto/convert/status_test.go
🧠 Learnings (5)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks
Applied to files:
internal/runtime/runner_helper_test.gointernal/common/fileutil/fileutil_test.gointernal/service/coordinator/client_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/cmd/context.gointernal/service/worker/poller_test.gointernal/runtime/executor/dag_runner.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths
Applied to files:
internal/common/fileutil/fileutil_test.gointernal/test/helper.gointernal/integration/remote_test.gointernal/service/coordinator/static_registry_test.gointernal/service/coordinator/client_test.gointernal/runtime/executor/dag_runner_test.gointernal/proto/convert/status_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages
Applied to files:
internal/common/fileutil/fileutil_test.gointernal/integration/remote_test.gointernal/proto/convert/status_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`
Applied to files:
internal/runtime/executor/dag_runner.go
🧬 Code graph analysis (30)
internal/service/worker/handler.go (1)
internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-241)
internal/service/coordinator/handler_test.go (5)
internal/core/execution/dagrun.go (2)
DAGRunRef(128-131)ErrDAGRunIDNotFound(14-14)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/service/coordinator/handler.go (2)
NewHandler(75-85)WithDAGRunStore(62-66)internal/core/status.go (6)
Running(8-8)NodeRunning(66-66)NodeSucceeded(69-69)Failed(9-9)NodeFailed(67-67)Succeeded(11-11)
internal/service/scheduler/dag_executor.go (2)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/runtime/executor/task.go (1)
WithPreviousStatus(83-89)
internal/runtime/context.go (1)
internal/core/execution/context.go (2)
WithLogWriterFactory(229-233)LogWriterFactory(35-39)
internal/test/coordinator.go (3)
internal/test/helper.go (5)
HelperOption(42-42)Helper(360-374)Options(44-54)WithStatusPersistence(84-88)WithLogPersistence(92-96)internal/service/coordinator/handler.go (5)
HandlerOption(59-59)WithDAGRunStore(62-66)WithLogDir(69-73)NewHandler(75-85)Handler(39-56)internal/common/config/path.go (1)
Paths(13-30)
internal/common/fileutil/fileutil_test.go (1)
internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-241)
internal/runtime/remote/status_pusher.go (5)
internal/service/coordinator/client.go (1)
Client(29-55)internal/runtime/agent/agent.go (1)
StatusPusher(153-155)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)proto/coordinator/v1/coordinator.pb.go (3)
ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)internal/proto/convert/status.go (1)
DAGRunStatusToProto(12-51)
internal/runtime/executor/task.go (3)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)proto/coordinator/v1/coordinator.pb.go (3)
Task(365-384)Task(397-397)Task(412-414)internal/proto/convert/status.go (1)
DAGRunStatusToProto(12-51)
internal/runtime/manager.go (5)
internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)internal/core/execution/dagrun.go (1)
NewDAGRunRef(135-140)internal/core/dag.go (1)
SockAddr(611-637)internal/common/fileutil/fileutil.go (1)
FileExists(44-47)internal/test/server.go (1)
Request(113-120)
internal/test/helper.go (2)
internal/runtime/agent/agent.go (1)
Options(158-189)internal/persistence/filedag/store.go (1)
Options(31-36)
internal/integration/remote_test.go (4)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (3)
WithStatusPersistence(84-88)DAG(446-449)WithLogPersistence(92-96)internal/core/execution/queue.go (1)
QueueStore(16-35)internal/core/status.go (5)
Succeeded(11-11)NodeSucceeded(69-69)Running(8-8)Aborted(10-10)Failed(9-9)
internal/service/worker/worker.go (4)
internal/core/execution/context.go (1)
Context(17-29)internal/cmd/context.go (1)
Context(44-59)internal/common/logger/context.go (1)
Info(40-42)internal/common/logger/tag/tag.go (2)
RunID(40-42)WorkerID(65-67)
internal/cmd/startall.go (1)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)
internal/service/worker/poller_test.go (2)
proto/coordinator/v1/coordinator.pb.go (15)
HeartbeatRequest(700-707)HeartbeatRequest(720-720)HeartbeatRequest(735-737)HeartbeatResponse(761-768)HeartbeatResponse(781-781)HeartbeatResponse(796-798)ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)ReportStatusResponse(1015-1021)ReportStatusResponse(1034-1034)ReportStatusResponse(1049-1051)GetDAGRunStatusResponse(1899-1906)GetDAGRunStatusResponse(1919-1919)GetDAGRunStatusResponse(1934-1936)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)
internal/service/scheduler/queue_processor.go (1)
internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)
internal/proto/convert/status.go (5)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/common/cmdutil/eval_test.go (1)
Root(124-127)internal/core/params.go (1)
Params(36-45)internal/core/step.go (1)
ExecutorConfig(195-203)internal/common/collections/syncmap.go (1)
SyncMap(10-14)
internal/runtime/executor/dag_runner.go (5)
internal/core/dag.go (1)
DAG(63-178)internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-241)internal/core/execution/context.go (2)
Context(17-29)RunStatus(126-137)internal/proto/convert/status.go (1)
ProtoToDAGRunStatus(114-153)internal/core/execution/node.go (1)
Node(9-36)
internal/service/coordinator/client.go (1)
proto/coordinator/v1/coordinator.pb.go (18)
HeartbeatRequest(700-707)HeartbeatRequest(720-720)HeartbeatRequest(735-737)HeartbeatResponse(761-768)HeartbeatResponse(781-781)HeartbeatResponse(796-798)ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)ReportStatusResponse(1015-1021)ReportStatusResponse(1034-1034)ReportStatusResponse(1049-1051)GetDAGRunStatusResponse(1899-1906)GetDAGRunStatusResponse(1919-1919)GetDAGRunStatusResponse(1934-1936)GetDAGRunStatusRequest(1828-1838)GetDAGRunStatusRequest(1851-1851)GetDAGRunStatusRequest(1866-1868)
internal/service/coordinator/static_registry_test.go (1)
internal/service/coordinator/static_registry.go (1)
NewStaticRegistry(22-49)
internal/common/config/loader.go (1)
internal/common/config/config.go (1)
Worker(279-284)
internal/runtime/output.go (2)
internal/core/execution/context.go (4)
Context(17-29)LogWriterFactory(35-39)StreamTypeStdout(44-44)StreamTypeStderr(46-46)internal/runtime/context.go (3)
Context(14-14)GetDAGContext(50-52)LogWriterFactory(46-46)
internal/service/worker/handler_test.go (1)
internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-241)
internal/cmd/worker.go (3)
internal/service/worker/worker.go (2)
Worker(21-34)NewWorker(42-61)internal/service/coordinator/static_registry.go (1)
NewStaticRegistry(22-49)internal/service/worker/remote_handler.go (2)
RemoteTaskHandlerConfig(28-45)NewRemoteTaskHandler(49-60)
internal/runtime/agent/agent.go (5)
internal/runtime/remote/status_pusher.go (1)
StatusPusher(14-17)internal/core/execution/context.go (4)
LogWriterFactory(35-39)Context(17-29)WithLogWriterFactory(229-233)NewContext(238-277)internal/runtime/context.go (4)
LogWriterFactory(46-46)Context(14-14)WithLogWriterFactory(42-42)NewContext(28-28)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)
internal/runtime/executor/dag_runner_test.go (1)
internal/runtime/data.go (1)
Parallel(85-89)
internal/proto/convert/status_test.go (6)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/status.go (4)
Running(8-8)NodeSucceeded(69-69)NodeRunning(66-66)NodeNotStarted(65-65)internal/core/params.go (1)
Params(36-45)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)internal/core/step.go (1)
ExecutorConfig(195-203)internal/common/collections/syncmap.go (1)
SyncMap(10-14)
internal/service/coordinator/handler.go (4)
internal/core/execution/dagrun.go (2)
DAGRunStore(23-48)DAGRunRef(128-131)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/proto/convert/status.go (1)
ProtoToDAGRunStatus(114-153)internal/core/status.go (3)
Failed(9-9)NodeRunning(66-66)NodeFailed(67-67)
internal/runtime/remote/log_streamer.go (4)
internal/service/coordinator/client.go (1)
Client(29-55)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)internal/core/execution/context.go (3)
Context(17-29)StreamTypeStdout(44-44)StreamTypeStderr(46-46)proto/coordinator/v1/coordinator.pb.go (10)
LogChunk(1640-1657)LogChunk(1670-1670)LogChunk(1685-1687)LogStreamType(127-127)LogStreamType(159-161)LogStreamType(163-165)LogStreamType(172-174)LogStreamType_LOG_STREAM_TYPE_STDOUT(131-131)LogStreamType_LOG_STREAM_TYPE_STDERR(132-132)LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED(130-130)
internal/service/coordinator/log_handler.go (3)
internal/common/logger/context.go (1)
Warn(45-47)proto/coordinator/v1/coordinator.pb.go (10)
StreamLogsResponse(1767-1774)StreamLogsResponse(1787-1787)StreamLogsResponse(1802-1804)LogChunk(1640-1657)LogChunk(1670-1670)LogChunk(1685-1687)LogStreamType(127-127)LogStreamType(159-161)LogStreamType(163-165)LogStreamType(172-174)internal/common/fileutil/fileutil.go (1)
OpenOrCreateFile(62-70)
internal/cmd/coordinator.go (4)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/cmd/context.go (1)
Context(44-59)internal/service/coordinator/service.go (2)
Service(20-29)NewService(31-50)internal/service/coordinator/handler.go (4)
Handler(39-56)NewHandler(75-85)WithDAGRunStore(62-66)WithLogDir(69-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
internal/integration/distributed_parallel_test.go (1)
327-341: Fix ignoredFindSubAttempterror (current assertion checks the wrongerr).You discard the
FindSubAttempterror (att, _ := ...) and thenrequire.NoError(t, err)uses an outererr, which can hide failures.Proposed fix
- for _, child := range parallelNode.SubRuns { - att, _ := coord.DAGRunStore.FindSubAttempt(coord.Context, runRef, child.DAGRunID) + for _, child := range parallelNode.SubRuns { + att, findErr := coord.DAGRunStore.FindSubAttempt(coord.Context, runRef, child.DAGRunID) + require.NoError(t, findErr) if att == nil { continue } - require.NoError(t, err) - - status, err := att.ReadStatus(coord.Context) - require.NoError(t, err) + status, readErr := att.ReadStatus(coord.Context) + require.NoError(t, readErr) require.Equal(t, core.Aborted, status.Status) canceled = true }internal/runtime/agent/agent.go (1)
1365-1371: Potential info leak: returning rawerr.Error()over HTTP.Even though this is over a unix socket, it may expose more internal detail than intended; consider mapping to a generic message for non-typed errors (or ensuring errors never include sensitive data).
internal/service/coordinator/handler_test.go (1)
165-215: Tests are likely flaky due to fixedtime.Sleepsynchronization.The 100ms sleeps (“give the poller time to register”) will intermittently fail under load/CI variance, especially with
t.Parallel(). Preferrequire.Eventuallyand (for the poll test) a buffered channel to avoid goroutine blocking while the test sleeps.Example rewrite (PollAndDispatch)
- // Start polling in a goroutine - pollDone := make(chan *coordinatorv1.PollResponse) - pollErr := make(chan error) + // Start polling in a goroutine + pollDone := make(chan *coordinatorv1.PollResponse, 1) + pollErr := make(chan error, 1) go func() { @@ - // Give the poller time to register - time.Sleep(100 * time.Millisecond) + // Wait until poller is registered (avoid timing sleeps) + require.Eventually(t, func() bool { + h.mu.Lock() + defer h.mu.Unlock() + _, ok := h.waitingPollers["poller1"] + return ok + }, 1*time.Second, 10*time.Millisecond)Based on coding guidelines, this also keeps tests more deterministic.
Also applies to: 240-270
🤖 Fix all issues with AI agents
In @internal/runtime/remote/log_streamer.go:
- Around line 84-145: flush() currently sends the entire w.buffer as one
LogChunk which can exceed gRPC message size; change flush() to send the buffer
in bounded pieces: define a maxChunkSize (e.g. 4*1024*1024), ensure the stream
is initialized as now, then loop over w.buffer creating a copy/slice of at most
maxChunkSize for each iteration, increment w.sequence for each sent chunk, call
w.stream.Send for each chunk, and only remove the bytes successfully sent from
w.buffer (i.e., on successful Send advance the buffer start or reslice),
returning the first Send error without discarding unsent data; keep the existing
behavior of creating dataCopy per chunk and the existing metadata fields
(WorkerId, DagRunId, StepName, StreamType, RootDagRunName/Id, AttemptId).
In @internal/service/coordinator/handler_test.go:
- Around line 48-70: Several mockDAGRunStore methods (CreateAttempt,
RecentAttempts, LatestAttempt, ListStatuses, FindSubAttempt, RemoveOldDAGRuns,
RenameDAGRuns, RemoveDAGRun) currently return (nil, nil) which can mask
test-time surprises; update each unimplemented mock method to return a clear
sentinel error (e.g., errors.New("mockDAGRunStore: unimplemented <MethodName>"))
or call panic("mockDAGRunStore: unimplemented <MethodName>") instead of (nil,
nil) so any accidental invocation fails loudly and points to the exact method.
In @internal/service/coordinator/handler.go:
- Around line 407-418: Update the error message in Handler.StreamLogs to avoid
saying "nil" for a string: keep the existing check on h.logDir but change the
status.Error call to use a message like "log streaming not configured: logDir is
empty" (reference: Handler.StreamLogs, h.logDir,
status.Error/codes.FailedPrecondition).
- Around line 87-97: Cached DAGRunAttempt instances in Handler.openAttempts can
be used concurrently by multiple ReportStatus calls, causing races when calling
attempt.Write; serialize access per dagRunID by introducing a per-run mutex or
lock map. Update Handler to maintain a map[string]*sync.Mutex (or sync.RWMutex)
keyed by dagRunID and acquire that mutex around any use of
openAttempts[dagRunID] (notably in ReportStatus paths and in Close), ensuring
attempt.Write/Close calls are executed while holding the per-run lock; release
and clean up the per-run mutex when deleting the attempt. Apply this change to
the code paths referenced (Handler.Close and the ReportStatus handling around
lines ~287-333 and ~335-405).
- Around line 466-494: StartZombieDetector currently overwrites
h.zombieDetectorDone and can start multiple goroutines; add a guard to prevent
starting twice by checking h.zombieDetectorDone before creating a new channel
and returning early if the detector is already running, and protect that
check+assign with a mutex to avoid races (e.g., introduce or reuse a lock field
and wrap the check/assignment around h.zombieDetectorDone), leaving
WaitZombieDetector unchanged so it will wait on the single channel.
- Around line 287-333: The handler currently returns a successful RPC with
Accepted:false for operational errors which clients may ignore; change
ReportStatus so operational failures return gRPC errors instead of a 200 OK
payload. Specifically, replace the branches that return
&coordinatorv1.ReportStatusResponse{Accepted:false, Error:...} when
getOrOpenAttempt/getOrOpenSubAttempt (the variables/funcs: getOrOpenAttempt,
getOrOpenSubAttempt) or attempt.Write fail, and return status.Error with an
appropriate code (e.g., codes.Internal or codes.Unavailable) and the error
message; keep returning a normal response only on success (Accepted:true).
Ensure you still validate req.Status and the dagRunStore precondition as before.
- Around line 496-593: The cleanup I/O in markRunFailed currently uses the
detector's cancellable ctx, so if that ctx is canceled in-flight the
ReadStatus/Open/Write calls can fail; modify markRunFailed (and calls from
markWorkerTasksFailed/detectAndCleanupZombies as needed) to create a
non-cancelable context for the store operations (e.g., use
context.WithoutCancel(ctx) on Go 1.21+ or context.Background() as a fallback)
and pass that new ctx into attempt.ReadStatus/Open/Write so zombie-marking
completes even if the caller ctx is canceled.
- Around line 539-590: markRunFailed currently calls FindAttempt directly which
can open a separate writer and race with ReportStatus; replace the
FindAttempt/Open sequence with the shared getOrOpenAttempt(ctx, dagName,
dagRunID) to reuse the per-run writer from openAttempts. Specifically, in
markRunFailed (and the similar block around lines 335-369) call getOrOpenAttempt
to obtain the attempt, remove the explicit attempt.Open call and rely on the
returned attempt being the shared, opened instance, keep the existing defer
attempt.Close(ctx) semantics as appropriate, and ensure all error logging
remains unchanged.
🧹 Nitpick comments (5)
internal/runtime/executor/dag_runner.go (1)
418-448: Consider combining completion check and status retrieval into a single RPC.In the polling loop,
isSubDAGRunCompleted(line 420) andgetSubDAGRunStatus(line 434) both makeGetDAGRunStatusRPC calls to the coordinator. This results in two RPCs per poll iteration when completion is detected.You could optimize by having
isSubDAGRunCompletedreturn the status when completed, or directly usegetSubDAGRunStatusand check if the status is terminal.♻️ Potential optimization
case <-ticker.C: - // Check if the sub DAG run has completed - isCompleted, err := e.isSubDAGRunCompleted(ctx, dagRunID) + // Get sub DAG run status + result, err := e.getSubDAGRunStatus(ctx, dagRunID) if err != nil { - logger.Warn(waitCtx, "Failed to check sub DAG run completion", + logger.Debug(waitCtx, "Sub DAG run status not available yet", tag.Error(err), ) - continue // Retry on error + continue } - if !isCompleted { + if result.Status.IsActive() { logger.Debug(waitCtx, "Sub DAG run not completed yet") - continue // Not completed, keep polling + continue } - // Check the final status of the sub DAG run - result, err := e.getSubDAGRunStatus(ctx, dagRunID) - if err != nil { - // Not found yet, continue polling - logger.Debug(waitCtx, "Sub DAG run status not available yet", - tag.Error(err), - ) - continue - } - - // If we got a result, the sub DAG has completed logger.Info(waitCtx, "Distributed execution completed", tag.Name(result.Name), ) return result, nilinternal/cmd/coordinator.go (1)
118-195: Consider guardingWithDAGRunStorewhen store can be nil.
coordinator.NewHandler(coordinator.WithDAGRunStore(dagRunStore), ...)is fine only if the handler treats nil as “persistence disabled” everywhere; otherwise it risks nil dereferences when shared-nothing features are exercised.Example guard
- handler := coordinator.NewHandler( - coordinator.WithDAGRunStore(dagRunStore), - coordinator.WithLogDir(cfg.Paths.LogDir), - ) + handlerOpts := []coordinator.HandlerOption{ + coordinator.WithLogDir(cfg.Paths.LogDir), + } + if dagRunStore != nil { + handlerOpts = append(handlerOpts, coordinator.WithDAGRunStore(dagRunStore)) + } + handler := coordinator.NewHandler(handlerOpts...)internal/service/coordinator/client.go (1)
198-250: Resiliency: consider evicting cached clients on failed health checks.
attemptCallremoves the client on connection creation failure, but not on repeatedisHealthyfailures; you may want toremoveClient(member.ID)(or oncodes.Unavailable) to allow a clean reconnect.Also applies to: 252-274
internal/service/worker/worker.go (1)
156-188: Cancellation handling is correct; consider not holdingpollersMuwhile calling cancel funcs.Calling
cancelFunc()under lock is usually fine, but you can reduce contention by collecting funcs under lock and invoking them after unlocking (especially if you later add more work in cancellation).Also applies to: 286-299
internal/runtime/remote/log_streamer.go (1)
129-145: Consider sequence increment semantics on send failure.Right now sequence increments before
Send; ifSendfails, you’ll have gaps. If the coordinator expects strict contiguous sequences, increment only after successfulSend.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (18)
internal/cmd/coordinator.gointernal/common/fileutil/fileutil.gointernal/common/fileutil/fileutil_test.gointernal/integration/distributed_coordinator_test.gointernal/integration/distributed_e2e_test.gointernal/integration/distributed_parallel_test.gointernal/integration/distributed_subdag_test.gointernal/integration/distributed_test.gointernal/runtime/agent/agent.gointernal/runtime/executor/dag_runner.gointernal/runtime/remote/log_streamer.gointernal/runtime/runner.gointernal/service/coordinator/client.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/coordinator/service.gointernal/service/worker/worker.gointernal/test/coordinator.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/test/coordinator.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/integration/distributed_coordinator_test.gointernal/integration/distributed_subdag_test.gointernal/integration/distributed_e2e_test.gointernal/service/coordinator/client.gointernal/service/coordinator/service.gointernal/integration/distributed_parallel_test.gointernal/integration/distributed_test.gointernal/runtime/agent/agent.gointernal/common/fileutil/fileutil_test.gointernal/common/fileutil/fileutil.gointernal/cmd/coordinator.gointernal/service/worker/worker.gointernal/runtime/executor/dag_runner.gointernal/runtime/remote/log_streamer.gointernal/service/coordinator/handler_test.gointernal/runtime/runner.gointernal/service/coordinator/handler.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/integration/distributed_coordinator_test.gointernal/integration/distributed_subdag_test.gointernal/integration/distributed_e2e_test.gointernal/integration/distributed_parallel_test.gointernal/integration/distributed_test.gointernal/common/fileutil/fileutil_test.gointernal/service/coordinator/handler_test.go
🧠 Learnings (5)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths
Applied to files:
internal/integration/distributed_coordinator_test.gointernal/integration/distributed_subdag_test.gointernal/integration/distributed_e2e_test.gointernal/integration/distributed_parallel_test.gointernal/integration/distributed_test.gointernal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages
Applied to files:
internal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks
Applied to files:
internal/common/fileutil/fileutil_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`
Applied to files:
internal/runtime/executor/dag_runner.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/runtime/executor/dag_runner.gointernal/service/coordinator/handler.go
🧬 Code graph analysis (11)
internal/integration/distributed_coordinator_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/integration/distributed_subdag_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/integration/distributed_e2e_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/service/coordinator/client.go (2)
proto/coordinator/v1/coordinator.pb.go (18)
HeartbeatRequest(700-707)HeartbeatRequest(720-720)HeartbeatRequest(735-737)HeartbeatResponse(761-768)HeartbeatResponse(781-781)HeartbeatResponse(796-798)ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)ReportStatusResponse(1015-1021)ReportStatusResponse(1034-1034)ReportStatusResponse(1049-1051)GetDAGRunStatusResponse(1899-1906)GetDAGRunStatusResponse(1919-1919)GetDAGRunStatusResponse(1934-1936)GetDAGRunStatusRequest(1828-1838)GetDAGRunStatusRequest(1851-1851)GetDAGRunStatusRequest(1866-1868)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)
internal/service/coordinator/service.go (1)
internal/service/scheduler/scheduler.go (2)
Scheduler(39-62)Scheduler(309-311)
internal/integration/distributed_parallel_test.go (3)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (2)
WithStatusPersistence(84-88)WithDAGsDir(63-67)internal/core/spec/loader.go (1)
WithDAGsDir(90-94)
internal/integration/distributed_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/common/fileutil/fileutil_test.go (2)
internal/service/worker/handler_test.go (1)
TestCreateTempDAGFile(171-185)internal/common/fileutil/fileutil.go (2)
CreateTempDAGFile(200-250)FileExists(44-47)
internal/cmd/coordinator.go (4)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/cmd/context.go (1)
Context(44-59)internal/service/coordinator/service.go (2)
Service(21-34)NewService(36-57)internal/service/coordinator/handler.go (3)
Handler(39-56)WithDAGRunStore(62-66)WithLogDir(69-73)
internal/service/worker/worker.go (6)
internal/runtime/subcmd.go (1)
Run(258-280)internal/common/config/config.go (1)
Worker(279-284)internal/core/execution/context.go (1)
Context(17-29)internal/cmd/context.go (1)
Context(44-59)internal/common/logger/context.go (3)
Info(40-42)Warn(45-47)Errorf(75-77)internal/common/logger/tag/tag.go (2)
WorkerID(65-67)RunID(40-42)
internal/runtime/executor/dag_runner.go (4)
internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-250)internal/core/execution/context.go (3)
Context(17-29)RunStatus(126-137)GetContext(286-298)internal/proto/convert/status.go (1)
ProtoToDAGRunStatus(114-153)internal/core/execution/node.go (1)
Node(9-36)
🔇 Additional comments (37)
internal/common/fileutil/fileutil_test.go (5)
13-16: LGTM! - Parallel test execution added correctly.Adding
t.Parallel()to these existing test functions and subtests is appropriate since they uset.TempDir()for isolation and don't share mutable state.Also applies to: 36-37
229-252: LGTM! - BasicFile test case is well-structured.The test properly verifies file creation, content integrity, and path structure. Using
t.Cleanupfor deferred removal is idiomatic.
254-282: LGTM! - WithExtraDocs test validates YAML multi-document handling.Good test coverage verifying that YAML document separators conform to the spec by appearing on their own line.
284-302: LGTM! - Edge case for missing trailing newline is covered.This test ensures the implementation correctly handles documents without trailing newlines, which is important for YAML spec compliance.
304-319: LGTM! - Empty extraDocs case correctly tested.Ensures that when no extra documents are provided, the output matches the primary document exactly without any separators.
internal/common/fileutil/fileutil.go (3)
196-212: LGTM! - Clean temp file creation with proper error handling.The implementation correctly creates the temp directory structure and temp file with a meaningful naming pattern. Directory permission
0750is appropriate for a temp directory.
214-242: LGTM! - IIFE pattern ensures proper resource cleanup.The immediately-invoked function ensures
tempFile.Close()is always called before checking the write error, which is a clean pattern for this use case. The YAML separator logic correctly handles documents with or without trailing newlines.
244-250: LGTM! - Error handling with proper cleanup.The implementation correctly removes the temp file on write failure and returns a well-wrapped error.
internal/runtime/executor/dag_runner.go (5)
67-76: LGTM! - Local DAG collection for temp file creation.The logic correctly collects YAML data from sibling local DAGs (excluding the target child) to include in the temporary file, enabling local DAG references within sub-DAGs.
500-535: LGTM! - Output extraction handles both local and proto formats.The function correctly normalizes output variables from two storage formats:
- Local format:
KEY=value(extracts value after=)- Proto format: just
valueThe type assertions with early returns are safe and idiomatic.
537-564: LGTM! - Completion check helper with appropriate fallback logic.The helper correctly distinguishes between "not found" (returns
false, not completed) and actual errors. The fallback from coordinator to local DB provides resilience.
458-498: [No changes needed—the implementation is correct.]The
RunStatusstruct is intentionally designed with only five fields (Name,DAGRunID,Params,Outputs,Status). It does not have fields forStartedAt,FinishedAt, orNodes. All fields being copied fromdagRunStatusto the returnedRunStatusare appropriate, and both callers only accessStatusandName, which are already included.Likely an incorrect or invalid review comment.
382-388: Correct use ofcontext.WithoutCancelfor cancellation polling.Using
context.WithoutCancelensures status queries can complete even when the parent context is canceled, which is necessary to confirm the sub-DAG has actually terminated. Introduced in Go 1.21, this is the idiomatic approach for this pattern.internal/runtime/runner.go (2)
229-230: LGTM! Panic recovery now signals progress.The defer statement correctly passes
progressChto enable status updates after panic recovery, ensuring the coordinator can persist the failed state. This aligns with the broader status/log streaming architecture introduced in this PR.
1065-1089: LGTM! Panic recovery properly signals progress.The function correctly:
- Accepts optional
progressChparameter with nil-check (lines 1085-1087)- Updates metrics before signaling progress
- Signals progress so the coordinator can persist the failed state
The implementation is backward-compatible and thread-safe.
internal/service/coordinator/service.go (3)
27-27: LGTM! New fields support graceful shutdown.The
cfgfield enables configuration-driven behavior (e.g., zombie detection interval), andstopCancelenables proper cleanup of internal goroutines during shutdown.Also applies to: 32-33
67-79: LGTM! Zombie detector lifecycle properly initialized.The implementation correctly:
- Creates an internal cancellable context for lifecycle management
- Uses configured interval with a sensible default (45s)
- Includes nil-check for config (line 74)
- Logs startup for observability
137-146: LGTM! Graceful shutdown sequence is correct.The shutdown sequence properly:
- Cancels the internal context to signal the zombie detector to stop
- Waits for the zombie detector to finish (prevents resource leaks)
- Closes handler resources after the detector has stopped
The nil-check for
stopCancel(line 138) prevents panics if Start was never called.internal/integration/distributed_subdag_test.go (1)
31-31: LGTM! Test setup updated for status persistence.All test cases now correctly enable status persistence via
test.WithStatusPersistence(), which is essential for testing the distributed coordinator's ability to track and persist DAG run states across the new shared-nothing worker architecture.Also applies to: 70-70, 106-106, 140-140
internal/integration/distributed_coordinator_test.go (1)
18-18: LGTM! Test setup updated for status persistence.Both test cases (
TestCoordinatorGetWorkersandTestCoordinatorHeartbeat) now enable status persistence, which is necessary for testing worker registration and heartbeat functionality in the coordinator.Also applies to: 107-107
internal/integration/distributed_test.go (1)
32-32: LGTM! Test setup updated for status persistence.All test cases now enable status persistence, which is essential for verifying that DAGs with worker selectors are correctly tracked and persisted in the distributed execution model.
Also applies to: 83-83, 122-122
internal/integration/distributed_e2e_test.go (3)
37-40: Good: enable coordinator status persistence for the E2E flow.Switching to
test.SetupCoordinator(t, test.WithStatusPersistence())matches the new distributed/persistent coordinator behavior and should make status assertions more realistic.
141-153: Good: localdagu starttest still uses persistent coordinator setup.This keeps the test environment consistent with the distributed mode while validating “start executes locally”.
194-197: Good: cancellation E2E uses persistent coordinator setup.Makes the abort/cancel assertions less dependent on in-memory state.
internal/integration/distributed_parallel_test.go (1)
40-42: Good: distributed-parallel tests now run with status persistence enabled.Consistently using
test.WithStatusPersistence()(andtest.WithDAGsDir(...)where needed) aligns these tests with the persistent coordinator path.Also applies to: 118-120, 178-179, 217-218, 260-262, 384-386, 467-469
internal/service/coordinator/client.go (3)
29-55: Nice API expansion for shared-nothing: Heartbeat response + status/log streaming hooks.The interface updates cleanly express cancellation directives (
HeartbeatResponse) and add the needed RPC surfaces for shared-nothing workers.
105-188: Good: centralized coordinator discovery with explicit “no coordinators” error.Using
getCoordinatorMembersin Dispatch/Poll improves error clarity and avoids silent no-op behavior.Also applies to: 385-396
499-539: Verify gRPC client creation API (grpc.NewClient) and dial options expectations.If the repo standard is
grpc.DialContext(and/or expectsWithBlock, keepalive, service config, etc.), confirmgrpc.NewClient(address, ...)is correct for grpc-go v1.75.0 and your connection lifecycle expectations.Also applies to: 571-621
internal/service/worker/worker.go (3)
118-147: Good: Stop cancels internal ctx, waits for goroutines, then cleans up gRPC clients.The
stopOnce+stopDonepattern is a solid step toward graceful shutdown.
259-262: Verify Go version compatibility formin(...)/math.MaxInt32usage.If the module Go version is < 1.21,
minwon’t compile.
68-116: No action needed—the blocking behavior is the intended design and is already properly used throughout the codebase.Tests explicitly verify that
Start()blocks until shutdown (via context cancellation orStop()), and all existing call sites correctly wrap it in goroutines. The shutdown coordination betweenStart()andStop()is properly implemented and tested. There is no risk of deadlock in current code.internal/runtime/agent/agent.go (5)
138-189: Good: clean option surface for shared-nothing status/log plumbing.The
Optionsadditions and the corresponding Agent wiring are straightforward and keep the default (local) path intact.Also applies to: 205-224
339-345: Good: propagate attempt ID + log-writer factory into runtime context.Setting attempt ID (when supported) and attaching
runtime.WithLogWriterFactory(...)should make remote step log streaming work without invasive changes elsewhere.Also applies to: 361-373
876-897: Verify Go version + shared-nothing status semantics (context.WithoutCancel+ push-only).
context.WithoutCancelrequires Go >= 1.21, and “push-only” means local status files may no longer be updated—ensure all consumers in shared-nothing mode read from coordinator (or otherwise don’t rely on local status persistence).
904-915: Good: avoid sending SIGTERM on ctx cancellation after normal completion.The
!a.finished.Load()guard prevents unnecessary termination signaling on normal shutdown paths.
1279-1294: The code is correct—Retry: trueis appropriate for both cases.The
Retryflag inStore.CreateAttempt()only determines whether to find an existingDAGRun(whentrue) or create a new one (whenfalse). It is not passed torun.CreateAttempt(), so it does not affect attempt directory naming, timestamps, or uniqueness. Attempts always receive unique IDs viagenAttemptID()regardless of whetherRetryis true or false.Both use cases correctly set
Retry: true:
- Retrying a failed execution: The run already exists from the previous attempt
- Running from queue: The run was pre-created by the enqueue operation
There is no risk of overwriting or selecting the wrong attempt directory. The suggested refactoring to a dedicated
UseExistingRunoption would be a semantic improvement for clarity, but the current implementation is functionally correct.internal/cmd/coordinator.go (1)
81-106: Remove redundant cleanup from defer;Service.Stop()already handles zombie detector and handler shutdown.The
svc.Stop(ctx)call already cancels the internal context (signaling zombie detector to exit), waits for it to complete viaWaitZombieDetector(), and closes handler resources. The defer block duplicates this cleanup unnecessarily. Since the zombie detector goroutine only watches the internal context cancellation (not triggered byClose()), there is no shutdown hang risk—the current order is safe but wasteful.Likely an incorrect or invalid review comment.
515a6cc to
053e670
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
internal/service/scheduler/queue_processor.go (1)
453-497:monitorStartuptreatscore.Runningas “not started” (should count as started).Right now, if heartbeat isn’t observed but status is
Running,monitorStartupstill returnserrNotStarted(Line 489-497). That makes startup confirmation depend on heartbeat (or a terminal status), which can unnecessarily delay dequeueing and amplifies duplicate-dispatch risk when heartbeats are delayed or flaky.Proposed fix
- if status.Status != core.Queued && status.Status != core.Running { - logger.Info(ctx, "DAG execution started or finished", - tag.Status(status.Status.String()), - ) - return true, nil - } - - return false, errNotStarted + switch status.Status { + case core.Queued: + return false, errNotStarted + case core.Running: + logger.Info(ctx, "DAG run has started (status is running)", + tag.Status(status.Status.String()), + ) + return true, nil + default: + logger.Info(ctx, "DAG execution started or finished", + tag.Status(status.Status.String()), + ) + return true, nil + }internal/service/coordinator/service.go (1)
59-79:Stop()assumes zombie detector was started—ensureWaitZombieDetector()can’t block forever.
IfStop()can be called on a partially-initialized service, consider guarding with a “started” flag or makingWaitZombieDetector()a no-op when not started.Also applies to: 118-147
internal/runtime/runner.go (2)
223-235: Critical: defer order sendsdoneChbefore panic recovery runs.
Because defers run LIFO,doneCh <- nhappens beforerecoverNodePanic, so the runner loop can process a “completed” node that still looksNodeRunningand has no panic error recorded yet.Proposed fix (defer doneCh first so it executes last)
go func(n *Node) { @@ - // Ensure node is finished and wg is decremented - defer r.finishNode(n, &wg) - // Recover from panics and signal progress for status updates - defer r.recoverNodePanic(ctx, n, progressCh) - // Signal completion to runner loop - defer func() { - doneCh <- n - }() + // Signal completion to runner loop (execute last) + defer func() { doneCh <- n }() + // Recover from panics and signal progress for status updates + defer r.recoverNodePanic(ctx, n, progressCh) + // Ensure node is finished and wg is decremented + defer r.finishNode(n, &wg)
1071-1095: Panic recovery should set a terminal node status (likelyNodeFailed).
Right nowrecoverNodePanicrecords the error, but ifnode.MarkErrordoesn’t also set status, the node can remainNodeRunningand confuse final status computation.Minimal fix
func (r *Runner) recoverNodePanic(ctx context.Context, node *Node, progressCh chan *Node) { if panicObj := recover(); panicObj != nil { @@ node.MarkError(err) + node.SetStatus(core.NodeFailed) r.setLastError(err)internal/service/worker/worker.go (1)
74-116: Behavior change:Start()now blocks until shutdown—ensure callers expect this.
If previous behavior was “start in background and return”, this will deadlock call sites that don’t callStop()or cancelctx. Consider renaming toRun()or updating call sites/docs accordingly.internal/cmd/coordinator.go (1)
81-105: Use a non-canceled context for graceful shutdown operations (best practice).After
<-ctx.Done()(line 98),svc.Stop(ctx)(line 101) and the deferredhandler.Close(ctx)(line 90) receive a canceled context. While current implementations ofStopandClosedon't explicitly check context cancellation and will still complete, this is semantically incorrect for shutdown operations. The test code (internal/test/coordinator.go) demonstrates the expected pattern: creating a separate timeout context withcontext.WithTimeout(context.Background(), 5*time.Second)for theStopcall.Proposed fix (use non-canceled context for cleanup)
func runCoordinator(ctx *Context, _ []string) error { svc, handler, err := newCoordinator(ctx, ctx.Config, ctx.ServiceRegistry, ctx.DAGRunStore) if err != nil { return fmt.Errorf("failed to initialize coordinator: %w", err) } // Ensure handler resources are cleaned up on shutdown defer func() { handler.WaitZombieDetector() - handler.Close(ctx) + handler.Close(context.WithoutCancel(ctx)) }() if err := svc.Start(ctx); err != nil { return fmt.Errorf("failed to start coordinator: %w", err) } // Wait for context cancellation <-ctx.Done() logger.Info(ctx, "Coordinator shutting down") - if err := svc.Stop(ctx); err != nil { + if err := svc.Stop(context.WithoutCancel(ctx)); err != nil { return fmt.Errorf("failed to stop coordinator: %w", err) } return nil }
🤖 Fix all issues with AI agents
In @internal/cmd/worker.go:
- Around line 77-104: The runWorker function uses the original workerID variable
for RemoteTaskHandlerConfig.WorkerID and tag.WorkerID even when empty; after
constructing w := worker.NewWorker(...), detect if workerID == "" and replace it
with the actual ID generated by the Worker (read from w via its accessor, e.g.
w.ID() or w.GetID()) before creating handlerCfg and logging so
RemoteTaskHandlerConfig.WorkerID and tag.WorkerID use the normalized, non-empty
worker ID.
In @internal/runtime/output.go:
- Around line 309-325: Wrap remote writers returned by factory.NewStepWriter
with masking.NewMaskingWriter when oc.masker is non-nil in setupRemoteWriters so
remote streams get the same secret-masking as local files; specifically, after
creating oc.stdoutWriter and (if not merged) oc.stderrWriter, check oc.masker
and replace each writer with masking.NewMaskingWriter(writer, oc.masker) before
assigning to oc.stdoutWriter/oc.stderrWriter (preserve the merged-stream logic
where stderrWriter = stdoutWriter when data.State.Stdout == data.State.Stderr)
and keep stdoutFileName/stderrFileName assignments and the existing return
behavior.
In @internal/service/coordinator/handler.go:
- Around line 457-481: StartZombieDetector currently calls
time.NewTicker(interval) which will panic if interval <= 0; add a guard at the
start of Handler.StartZombieDetector to validate the interval (e.g., if interval
<= 0 then return an error or set a sensible default) before creating the ticker
and before marking zombieDetectorStarted true. Ensure the check references
StartZombieDetector and time.NewTicker so you don't create the ticker or launch
the goroutine when interval is non-positive.
- Around line 292-391: getOrOpenSubAttempt currently uses subDAGRunID alone as
the cacheKey which can collide across different roots or with top-level IDs;
change getOrOpenSubAttempt to build a unique cache key (e.g. include the
rootRef.Name and rootRef.ID plus the subDAGRunID and a "sub:" prefix) and pass
that composed key into getOrOpenAttemptWithFinder, ensuring the finder still
calls dagRunStore.FindSubAttempt(rootRef, subDAGRunID); this prevents cross-root
collisions while preserving existing finder behavior.
- Around line 89-102: Handler.Close currently holds attemptsMu while calling
attempt.Close(ctx); change it to grab attemptsMu, copy or collect the entries
from openAttempts (e.g., into a slice or map of dagRunID->attempt), then release
attemptsMu, create a non-cancelable context via context.WithoutCancel(ctx) and
call attempt.Close(newCtx) for each collected attempt outside the lock, then
reacquire attemptsMu to remove/clear those entries from openAttempts (or delete
per dagRunID) while logging failures using the same logger and tags; this
ensures attempt.Close does I/O without blocking attemptsMu.
In @internal/service/coordinator/log_handler.go:
- Around line 25-28: The writers map currently keys by streamKey() which is
built from raw chunk fields and can differ from the sanitized filesystem path
returned by logFilePath(), causing multiple logWriter/os.File instances to be
opened for the same actual file; change the map key to the resolved sanitized
path (call logFilePath(...) to compute the filename once) and use that sanitized
path as the unique key when looking up/creating entries in writers (and when
closing/removing entries), update all places that reference streamKey() for
writer lookup/creation to instead compute and use the sanitized path, and ensure
logWriter creation is atomic under writersMu to avoid races.
- Around line 95-142: handleStream currently leaves per-stream writers open when
the RPC ends with io.EOF or any non-EOF error; update it to track which writers
were touched during the loop (e.g., a map[string]struct{} or slice of keys) and
ensure all those writers are closed when the handler returns (both on io.EOF
after SendAndClose and on any other error path). Add a helper
closeWriterByKey(key string) that acquires the writers map lock, removes the
writer from the map, releases the lock, and then closes the writer outside the
lock; replace direct calls to closeWriter(ctx, chunk) with using that
removal/close helper where appropriate and call it for every touched writer
before returning from handleStream. Ensure getOrCreateWriter, closeWriterByKey
and writer.write function names are used so you update the correct places.
In @internal/service/worker/poller_test.go:
- Around line 450-472: The mockCoordinatorCli.StreamLogs method currently
returns (nil, nil) which can cause nil-pointer panics when tests exercise
Send/CloseAndRecv; update mockCoordinatorCli.StreamLogs to return a non-nil
error (e.g., gRPC codes.Unimplemented wrapped via status.Error) or return a
minimal fake implementing coordinatorv1.CoordinatorService_StreamLogsClient so
callers get a valid client; locate the StreamLogs method on mockCoordinatorCli
and replace the nil return with either a status.Error(codes.Unimplemented,
"...") or an instance of a simple mock stream client that implements
Send/CloseAndRecv.
In @internal/service/worker/remote_handler.go:
- Around line 27-60: NewRemoteTaskHandler currently constructs a
remoteTaskHandler without validating required dependencies, but loadDAG (and
code paths in Handle) assume h.config and h.config.Paths (and other fields) are
non-nil which can cause a nil deref at runtime; update NewRemoteTaskHandler to
validate required fields from RemoteTaskHandlerConfig (at minimum cfg.Config and
cfg.Config.Paths, plus any other non-optional deps like CoordinatorClient,
DAGStore, DAGRunMgr) and return an error if missing, or alternatively add
upfront nil-guards in Handle and loadDAG that return clear errors instead of
panicking; ensure you reference RemoteTaskHandlerConfig, NewRemoteTaskHandler,
remoteTaskHandler, loadDAG, Handle, h.config and h.config.Paths when making the
checks so callers receive a meaningful error rather than a runtime panic.
- Around line 187-205: The temp DAG creation can fail if task.Target contains
path separators because fileutil.CreateTempDAGFile forwards the name to
os.CreateTemp; sanitize the name by using filepath.Base (or another
DAG-name-only value) before passing to fileutil.CreateTempDAGFile (i.e. call
fileutil.CreateTempDAGFile("worker-dags", filepath.Base(task.Target),
[]byte(task.Definition))); keep the rest of the logic (assigning tempFile to
target and cleanupFunc) unchanged and ensure any references to task.Target later
still use the original when no temp file was created.
- Around line 116-141: The fallback lookup in handleRetry uses
h.dagRunStore.FindAttempt with execution.NewDAGRunRef(task.RootDagRunName,
task.DagRunId) which fails for sub-DAG retries; update handleRetry to check if
task.DagRunId != task.RootDagRunId and in that case call
h.dagRunStore.FindSubAttempt(ctx, root, task.DagRunId) (using the existing root
:= execution.DAGRunRef{Name: task.RootDagRunName, ID: task.RootDagRunId})
otherwise fall back to FindAttempt/execution.NewDAGRunRef as before, and
preserve the existing error wrapping/messages for failed lookups/ReadStatus.
In @internal/service/worker/worker_test.go:
- Around line 380-400: TestWorkerDefaultID currently doesn't verify generated ID
because Stop() without Start() doesn't exercise ID usage; update the test to
assert the worker id directly by either (A) moving the test into package worker
and asserting the unexported field (create the worker via worker.NewWorker and
assert w.id != ""), or (B) add a small exported accessor Worker.ID() on the
worker type and in TestWorkerDefaultID call w.ID() and require it is not empty;
alternatively ensure you call w.Start(...) before Stop() so the ID is exercised
at runtime. Use the symbols worker.NewWorker, TestWorkerDefaultID, w.id or
Worker.ID(), and Start/Stop as appropriate.
In @internal/service/worker/worker.go:
- Around line 31-39: Start/Stop ordering race: Stop can run before Start
initializes w.stopCancel/w.stopDone; initialize the shutdown primitives in
NewWorker so Stop is safe anytime. In NewWorker create the internal context and
cancel (context.WithCancel) and allocate stopDone channel on the worker struct
(and keep using stopOnce in Stop) so Start only launches goroutines and Stop can
always call cancel/close without nil checks; reference the worker fields
stopCancel, stopDone, stopOnce and the constructors NewWorker/Start/Stop to
locate and update initialization.
- Around line 118-147: The cleanup call at the end of Worker.Stop should not use
the possibly-canceled caller ctx; before calling w.coordinatorCli.Cleanup(ctx)
create a fresh timeout context (e.g., cleanupCtx, cancel :=
context.WithTimeout(context.Background(), cleanupTimeout) with defer cancel())
and pass cleanupCtx to w.coordinatorCli.Cleanup so cleanup runs even if the
original ctx was canceled; keep the same error wrapping into err on failure.
Ensure you reference Worker.Stop, w.stopCancel, w.stopDone and
w.coordinatorCli.Cleanup when making the change.
In @proto/coordinator/v1/coordinator.proto:
- Around line 19-31: DAGRunStatusProto has mixed timestamp types (created_at is
int64 while queued_at/started_at/finished_at are string); pick a single
canonical representation (prefer google.protobuf.Timestamp or consistently int64
unix ms) and update the proto fields (DAGRunStatusProto.created_at, queued_at,
started_at, finished_at) to that type, then update the conversion helper
DAGRunStatusToProto (and any inverse conversions) to emit the chosen format
consistently; ensure import of google/protobuf/timestamp.proto if you choose
Timestamp and adjust any RPC messages that reference these fields accordingly.
🧹 Nitpick comments (19)
internal/runtime/runner_helper_test.go (1)
336-336: Consider usingrequire.NotNilfor consistency.The
nodeByNamemethod still usest.Fatalffor the same nil check pattern that was updated torequire.NotNilinassertNodeStatus(line 312). For consistency with the updated approach and coding guidelines, consider applying the same pattern here.♻️ Suggested refactor for consistency
- t.Fatalf("step %s not found", stepName) - return nil + require.NotNil(t, nil, "step %s not found", stepName) + return nilNote: While the
return nilis technically unreachable after a fatal assertion, it may be kept for compiler satisfaction or removed depending on project style.internal/service/scheduler/queue_processor.go (1)
432-440: Retry error log message is misleading (it’s monitoring retries, not execution retries).This line logs
"Failed to execute DAG after retries", but the retry loop is aroundmonitorStartup, notExecuteDAG. Consider renaming to avoid confusion during incident/debug.Also, the closure reuses the outer
errvariable; not wrong, but easy to misread later.Proposed tweak
- operation := func(ctx context.Context) error { - started, err = p.monitorStartup(ctx, queueName, runRef) - return err - } + operation := func(ctx context.Context) error { + var monitorErr error + started, monitorErr = p.monitorStartup(ctx, queueName, runRef) + return monitorErr + } if err := backoff.Retry(ctx, operation, policy, nil); err != nil { - logger.Error(ctx, "Failed to execute DAG after retries", tag.Error(err)) + logger.Error(ctx, "Failed to confirm DAG startup after retries", tag.Error(err)) }internal/cmd/startall.go (1)
198-200: Consider handling the error fromcoordHandler.Close.
coordHandler.Closelikely returns an error (similar to other Close methods in the codebase), but it's currently being discarded. For consistency with other service cleanup patterns in this function (lines 193-196, 203-205), consider logging any error.Proposed fix
// Clean up coordinator handler resources coordHandler.WaitZombieDetector() - coordHandler.Close(ctx) + if err := coordHandler.Close(ctx); err != nil { + logger.Error(ctx, "Failed to close coordinator handler", tag.Error(err)) + }internal/cmd/worker_test.go (1)
54-189: Comprehensive test coverage forBuildCoordinatorClientConfig.The test suite covers key scenarios: empty/nil coordinators, insecure mode, TLS validation failures, valid TLS config, SkipTLSVerify, and multiple coordinators. Each test uses
t.Parallel()appropriately.Consider adding an assertion in
StaticCoordinatorsReturnsConfigorMultipleCoordinatorsto verify that the coordinator addresses are correctly passed through to the result config, if theCoordinatorClientConfigstruct contains an address field.internal/service/coordinator/log_handler.go (1)
195-205: Don’t holdwritersMuwhile doing file I/O inw.close().
Flush/Sync/Closecan block and will stall all streams becausewritersMuis held. Prefer: remove from map under lock, then close outside lock.Also applies to: 249-259
internal/service/worker/worker_test.go (2)
573-650: SetPollFuncbeforeStart()to avoid race/flakes.
Right now the worker can begin polling beforeSetPollFuncis installed (Sleep-based ordering).
712-799: Cancellation tests are timing-coupled to heartbeat frequency.
If heartbeat interval changes, these can become flaky. Preferrequire.Eventuallyon “cancellation observed” conditions and avoid fixedtime.Sleep(100ms)where possible.internal/cmd/worker.go (1)
127-147:BuildCoordinatorClientConfigshould defensively handlecfg == nil.
It’s a “pure function” intended for unit tests; guarding nil removes footguns.internal/integration/distributed_parallel_test.go (1)
329-340: Cancellation assertion may hide real failures bycontinueonfindErr.
IfFindSubAttempterrors, continuing can turn infra issues into “no canceled subruns found”. Considerrequire.NoError(or at leastt.Logf) on unexpectedfindErr.internal/service/coordinator/log_handler_test.go (1)
427-470:ConcurrentAccessshould assert all goroutines succeeded (and useWaitGroup).
Right now it reads 10 values fromdonebut doesn’t fail the test if any goroutine wrotefalse.internal/runtime/executor/dag_runner.go (1)
67-73: Non-deterministic ordering of extra YAML documents.The iteration order over
rCtx.DAG.LocalDAGs(a map) is non-deterministic. If the order of documents in the combined YAML file matters for parsing or if local DAGs reference each other, this could lead to flaky behavior.Consider sorting the keys before iteration if document order is significant:
🔧 Suggested fix for deterministic ordering
+import "slices" + // Collect extra docs from other local DAGs -var extraDocs [][]byte -for _, otherDAG := range rCtx.DAG.LocalDAGs { - if otherDAG.Name != childName { - extraDocs = append(extraDocs, otherDAG.YamlData) - } -} +var extraDocs [][]byte +dagNames := make([]string, 0, len(rCtx.DAG.LocalDAGs)) +for name := range rCtx.DAG.LocalDAGs { + if name != childName { + dagNames = append(dagNames, name) + } +} +slices.Sort(dagNames) +for _, name := range dagNames { + extraDocs = append(extraDocs, rCtx.DAG.LocalDAGs[name].YamlData) +}internal/service/coordinator/client.go (1)
497-539: Consider adding the failed client to removal on connection error.The
StreamLogsmethod has its own failover logic (appropriate since streaming can't useattemptCall), but unlikeGetWorkers(line 434-436), it doesn't remove failed clients from the cache when there's anUnavailableerror. This could leave stale connections in the cache.🔧 Suggested fix
for _, member := range members { client, err := cli.getOrCreateClient(member) if err != nil { cli.recordFailure(err) + cli.removeClient(member.ID) lastErr = err continue }internal/runtime/remote/log_streamer.go (1)
152-202: Consider documenting that Close creates a new stream if none existed.On line 171-184, if
w.streamis nil (no data was ever written), the final marker is not sent. This is likely intentional (nothing to finalize), but worth documenting in the method comment.internal/service/worker/worker.go (1)
160-188: Avoid calling cancel funcs (and logging) while holdingpollersMu.
context.CancelFuncis non-blocking today, but keeping this pattern lock-free prevents future deadlocks if cancellation plumbing changes (or if other work is added).Proposed refactor
func (w *Worker) processCancellations(ctx context.Context, cancelledRunIDs []string) { - w.pollersMu.Lock() - defer w.pollersMu.Unlock() - - for _, dagRunID := range cancelledRunIDs { - if cancelFunc, exists := w.cancelFuncs[dagRunID]; exists { - logger.Info(ctx, "Cancelling task per coordinator directive", - tag.RunID(dagRunID), - tag.WorkerID(w.id)) - cancelFunc() - } - } + var toCancel []struct { + id string + fn context.CancelFunc + } + + w.pollersMu.Lock() + for _, dagRunID := range cancelledRunIDs { + if cancelFunc, exists := w.cancelFuncs[dagRunID]; exists { + toCancel = append(toCancel, struct { + id string + fn context.CancelFunc + }{id: dagRunID, fn: cancelFunc}) + } + } + w.pollersMu.Unlock() + + for _, c := range toCancel { + logger.Info(ctx, "Cancelling task per coordinator directive", + tag.RunID(c.id), + tag.WorkerID(w.id)) + c.fn() + } }Also applies to: 286-299
internal/service/coordinator/static_registry.go (3)
22-49: Consider “skip invalid addresses” vs “fail fast” semantics explicitly.
Right now any single invalid entry hard-failsNewStaticRegistryeven if other addresses are valid; if this is intended, it’s fine—otherwise consider collecting errors and returning the valid subset with a combined error/warn log.
52-78:parseAddresswon’t support IPv6 (or any input containing multiple:).
If IPv6 coordinators are in scope, prefernet.SplitHostPortand require bracketed IPv6 ([::1]:50055), plus a “no port provided” path.
93-99: Return an empty slice instead ofnil, nilfor unsupported services.
This reduces nil-handling for callers while keeping the same meaning.Proposed tweak
func (r *StaticRegistry) GetServiceMembers(_ context.Context, name execution.ServiceName) ([]execution.HostInfo, error) { if name == execution.ServiceNameCoordinator { return r.coordinators, nil } // Return empty list for other services - return nil, nil + return []execution.HostInfo{}, nil }internal/service/worker/poller_test.go (1)
23-370: Parallel +time.Sleepmakes these tests prone to flakes; preferrequire.Eventuallyor channel sync.
This is especially relevant now that the suite is explicitly parallelized. (As per coding guidelines on Go tests, cover failure paths without timing brittleness.)internal/service/coordinator/handler_test.go (1)
20-185: Prefer shared fixtures over duplicating DAGRunStore mocks.The inline
mockDAGRunStore/mockDAGRunAttemptare sizable and will likely be reused across packages as shared-nothing coverage grows; consider moving them underinternal/test(or using an existing fixture) to avoid divergence. Based on learnings, tests in**/*_test.goshould favor shared fixtures frominternal/testover duplicating mocks.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
proto/coordinator/v1/coordinator.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (57)
codecov.ymlinternal/cmd/context.gointernal/cmd/coordinator.gointernal/cmd/flags.gointernal/cmd/startall.gointernal/cmd/worker.gointernal/cmd/worker_test.gointernal/common/config/config.gointernal/common/config/definition.gointernal/common/config/loader.gointernal/common/config/loader_test.gointernal/common/fileutil/fileutil.gointernal/common/fileutil/fileutil_test.gointernal/core/execution/context.gointernal/integration/distributed_coordinator_test.gointernal/integration/distributed_e2e_test.gointernal/integration/distributed_helpers_test.gointernal/integration/distributed_parallel_test.gointernal/integration/distributed_subdag_test.gointernal/integration/distributed_test.gointernal/integration/remote_test.gointernal/proto/convert/status.gointernal/proto/convert/status_test.gointernal/runtime/agent/agent.gointernal/runtime/context.gointernal/runtime/executor/dag_runner.gointernal/runtime/executor/dag_runner_test.gointernal/runtime/executor/task.gointernal/runtime/executor/task_test.gointernal/runtime/manager.gointernal/runtime/output.gointernal/runtime/output_test.gointernal/runtime/remote/log_streamer.gointernal/runtime/remote/status_pusher.gointernal/runtime/runner.gointernal/runtime/runner_helper_test.gointernal/service/coordinator/client.gointernal/service/coordinator/client_test.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/coordinator/log_handler.gointernal/service/coordinator/log_handler_test.gointernal/service/coordinator/service.gointernal/service/coordinator/static_registry.gointernal/service/coordinator/static_registry_test.gointernal/service/scheduler/dag_executor.gointernal/service/scheduler/dag_executor_test.gointernal/service/scheduler/queue_processor.gointernal/service/worker/handler.gointernal/service/worker/handler_test.gointernal/service/worker/poller_test.gointernal/service/worker/remote_handler.gointernal/service/worker/worker.gointernal/service/worker/worker_test.gointernal/test/coordinator.gointernal/test/helper.goproto/coordinator/v1/coordinator.proto
✅ Files skipped from review due to trivial changes (1)
- internal/test/coordinator.go
🚧 Files skipped from review as they are similar to previous changes (9)
- internal/service/worker/handler_test.go
- internal/common/config/loader.go
- internal/common/config/definition.go
- internal/integration/distributed_subdag_test.go
- internal/cmd/flags.go
- internal/integration/distributed_test.go
- internal/common/config/config.go
- internal/service/scheduler/dag_executor_test.go
- internal/service/worker/handler.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/cmd/worker.gointernal/common/fileutil/fileutil_test.gointernal/service/coordinator/log_handler_test.gointernal/cmd/context.gointernal/runtime/output.gointernal/runtime/manager.gointernal/service/coordinator/static_registry.gointernal/service/worker/worker.gointernal/runtime/runner.gointernal/service/coordinator/service.gointernal/core/execution/context.gointernal/service/coordinator/static_registry_test.gointernal/runtime/output_test.gointernal/test/helper.gointernal/runtime/executor/dag_runner_test.gointernal/common/fileutil/fileutil.gointernal/runtime/executor/dag_runner.gointernal/runtime/remote/log_streamer.gointernal/runtime/remote/status_pusher.gointernal/runtime/context.gointernal/service/coordinator/log_handler.gointernal/service/scheduler/dag_executor.gointernal/integration/distributed_helpers_test.gointernal/common/config/loader_test.gointernal/cmd/startall.gointernal/service/worker/worker_test.gointernal/integration/distributed_parallel_test.gointernal/proto/convert/status.gointernal/runtime/agent/agent.gointernal/service/coordinator/client.gointernal/cmd/worker_test.gointernal/service/worker/poller_test.gointernal/service/worker/remote_handler.gointernal/proto/convert/status_test.gointernal/integration/distributed_coordinator_test.gointernal/integration/distributed_e2e_test.gointernal/cmd/coordinator.gointernal/runtime/runner_helper_test.gointernal/runtime/executor/task_test.gointernal/service/coordinator/client_test.gointernal/service/scheduler/queue_processor.gointernal/integration/remote_test.gointernal/runtime/executor/task.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/common/fileutil/fileutil_test.gointernal/service/coordinator/log_handler_test.gointernal/service/coordinator/static_registry_test.gointernal/runtime/output_test.gointernal/runtime/executor/dag_runner_test.gointernal/integration/distributed_helpers_test.gointernal/common/config/loader_test.gointernal/service/worker/worker_test.gointernal/integration/distributed_parallel_test.gointernal/cmd/worker_test.gointernal/service/worker/poller_test.gointernal/proto/convert/status_test.gointernal/integration/distributed_coordinator_test.gointernal/integration/distributed_e2e_test.gointernal/runtime/runner_helper_test.gointernal/runtime/executor/task_test.gointernal/service/coordinator/client_test.gointernal/integration/remote_test.gointernal/service/coordinator/handler_test.go
🧠 Learnings (6)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/cmd/worker.gointernal/cmd/context.gointernal/runtime/executor/dag_runner.gointernal/service/worker/poller_test.gocodecov.yml
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Co-locate Go tests as `*_test.go`; favour table-driven cases and cover failure paths
Applied to files:
internal/common/fileutil/fileutil_test.gointernal/service/coordinator/log_handler_test.gointernal/service/coordinator/static_registry_test.gointernal/runtime/output_test.gointernal/test/helper.gointernal/common/config/loader_test.gointernal/integration/distributed_parallel_test.gointernal/proto/convert/status_test.gointernal/integration/distributed_coordinator_test.gointernal/integration/distributed_e2e_test.gointernal/service/coordinator/client_test.gointernal/integration/remote_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make test` (or `make test-coverage`) executes the Go suite via `gotestsum`; append `TEST_TARGET=./internal/...` to focus packages
Applied to files:
internal/common/fileutil/fileutil_test.gointernal/runtime/output_test.gointernal/proto/convert/status_test.gointernal/integration/remote_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*_test.go : Use `stretchr/testify/require` and shared fixtures from `internal/test` instead of duplicating mocks
Applied to files:
internal/common/fileutil/fileutil_test.gointernal/runtime/output_test.gointernal/cmd/worker_test.gointernal/integration/distributed_e2e_test.gointernal/runtime/runner_helper_test.gointernal/service/coordinator/handler_test.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Repository linting relies on `golangci-lint`; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in `internal/common`
Applied to files:
internal/runtime/executor/dag_runner.gocodecov.yml
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to {api/v1,api/v2}/**/*.{proto,yaml,yml,json} : API definitions live in `api/v1` and `api/v2`; generated server stubs land in `internal/service`, while matching TypeScript clients flow into `ui/src/api`
Applied to files:
codecov.yml
🧬 Code graph analysis (28)
internal/cmd/worker.go (3)
internal/common/config/config.go (2)
Peer(306-321)Worker(279-284)internal/service/coordinator/client.go (2)
Client(29-55)New(94-103)internal/service/coordinator/static_registry.go (1)
NewStaticRegistry(22-49)
internal/common/fileutil/fileutil_test.go (1)
internal/common/fileutil/fileutil.go (3)
OpenOrCreateFile(62-70)CreateTempDAGFile(200-250)FileExists(44-47)
internal/runtime/manager.go (6)
internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)internal/core/execution/dagrun.go (1)
NewDAGRunRef(135-140)internal/common/logger/tag/tag.go (2)
Name(271-273)Error(20-22)internal/common/logger/context.go (4)
Errorf(75-77)Info(40-42)Debug(35-37)Error(50-52)internal/core/dag.go (1)
SockAddr(611-637)internal/common/fileutil/fileutil.go (1)
FileExists(44-47)
internal/runtime/runner.go (3)
internal/core/status.go (4)
Status(4-4)NodeRunning(66-66)NodeAborted(68-68)NodeSucceeded(69-69)internal/core/step.go (3)
Step(13-103)RepeatPolicy(239-255)RepeatMode(229-229)internal/core/execution/node.go (1)
Node(9-36)
internal/core/execution/context.go (3)
internal/runtime/context.go (4)
LogWriterFactory(46-46)Context(14-14)WithLogWriterFactory(42-42)ContextOption(22-22)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)proto/coordinator/v1/coordinator.pb.go (3)
GetDAGRunStatusResponse(1899-1906)GetDAGRunStatusResponse(1919-1919)GetDAGRunStatusResponse(1934-1936)
internal/service/coordinator/static_registry_test.go (1)
internal/service/coordinator/static_registry.go (1)
NewStaticRegistry(22-49)
internal/runtime/output_test.go (3)
internal/runtime/data.go (3)
Parallel(85-89)NodeData(24-27)NodeState(29-81)internal/runtime/output.go (1)
OutputCoordinator(22-49)internal/core/execution/context.go (2)
Context(17-29)StreamTypeStdout(44-44)
internal/test/helper.go (2)
internal/runtime/agent/agent.go (1)
Options(158-189)internal/persistence/filedag/store.go (1)
Options(31-36)
internal/runtime/executor/dag_runner_test.go (1)
internal/runtime/data.go (1)
Parallel(85-89)
internal/runtime/executor/dag_runner.go (3)
internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-250)internal/core/execution/context.go (2)
Context(17-29)RunStatus(126-137)internal/proto/convert/status.go (1)
ProtoToDAGRunStatus(114-153)
internal/runtime/remote/log_streamer.go (5)
internal/service/coordinator/client.go (1)
Client(29-55)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)internal/core/execution/context.go (3)
Context(17-29)StreamTypeStdout(44-44)StreamTypeStderr(46-46)proto/coordinator/v1/coordinator.pb.go (10)
LogChunk(1640-1657)LogChunk(1670-1670)LogChunk(1685-1687)LogStreamType(127-127)LogStreamType(159-161)LogStreamType(163-165)LogStreamType(172-174)LogStreamType_LOG_STREAM_TYPE_STDOUT(131-131)LogStreamType_LOG_STREAM_TYPE_STDERR(132-132)LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED(130-130)api/v2/api.gen.go (1)
StepName(1342-1342)
internal/runtime/remote/status_pusher.go (6)
internal/service/coordinator/client.go (1)
Client(29-55)internal/runtime/agent/agent.go (1)
StatusPusher(153-155)internal/core/execution/context.go (1)
Context(17-29)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)proto/coordinator/v1/coordinator.pb.go (3)
ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)internal/proto/convert/status.go (1)
DAGRunStatusToProto(12-51)
internal/runtime/context.go (1)
internal/core/execution/context.go (2)
WithLogWriterFactory(229-233)LogWriterFactory(35-39)
internal/service/scheduler/dag_executor.go (2)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/runtime/executor/task.go (1)
WithPreviousStatus(83-89)
internal/integration/distributed_helpers_test.go (8)
internal/test/coordinator.go (1)
Coordinator(20-28)internal/service/worker/worker.go (2)
Worker(21-39)NewWorker(47-66)internal/service/worker/remote_handler.go (2)
RemoteTaskHandlerConfig(28-45)NewRemoteTaskHandler(49-60)internal/common/logger/tag/tag.go (1)
WorkerID(65-67)internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/execution/context.go (1)
Context(17-29)internal/common/fileutil/fileutil.go (1)
IsDir(34-40)
internal/service/worker/worker_test.go (5)
internal/runtime/subcmd.go (1)
Run(258-280)internal/service/worker/worker.go (1)
NewWorker(47-66)internal/core/step.go (1)
RetryPolicy(211-226)internal/common/logger/tag/tag.go (1)
Target(168-170)internal/common/logger/context.go (1)
Fatal(55-57)
internal/integration/distributed_parallel_test.go (4)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (2)
WithStatusPersistence(84-88)WithDAGsDir(63-67)internal/core/spec/loader.go (1)
WithDAGsDir(90-94)internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)
internal/proto/convert/status.go (5)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/params.go (1)
Params(36-45)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)internal/core/step.go (1)
ExecutorConfig(195-203)internal/common/collections/syncmap.go (1)
SyncMap(10-14)
internal/service/worker/poller_test.go (3)
proto/coordinator/v1/coordinator.pb.go (15)
PollRequest(177-184)PollRequest(197-197)PollRequest(212-214)Task(365-384)Task(397-397)Task(412-414)HeartbeatRequest(700-707)HeartbeatRequest(720-720)HeartbeatRequest(735-737)HeartbeatResponse(761-768)HeartbeatResponse(781-781)HeartbeatResponse(796-798)GetDAGRunStatusResponse(1899-1906)GetDAGRunStatusResponse(1919-1919)GetDAGRunStatusResponse(1934-1936)internal/service/coordinator/client.go (1)
Metrics(58-63)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)
internal/service/worker/remote_handler.go (8)
internal/service/worker/handler.go (1)
TaskHandler(18-20)internal/core/execution/dagrun.go (2)
DAGRunStore(23-48)DAGRunRef(128-131)proto/coordinator/v1/coordinator.pb.go (7)
Task(365-384)Task(397-397)Task(412-414)Operation(24-24)Operation(56-58)Operation(60-62)Operation(69-71)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/runtime/remote/status_pusher.go (2)
StatusPusher(14-17)NewStatusPusher(20-25)internal/runtime/remote/log_streamer.go (2)
LogStreamer(21-29)NewLogStreamer(32-48)internal/common/fileutil/fileutil.go (1)
CreateTempDAGFile(200-250)internal/core/spec/loader.go (3)
LoadOption(41-41)WithBaseConfig(44-48)Load(150-171)
internal/integration/distributed_coordinator_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/integration/distributed_e2e_test.go (2)
internal/test/coordinator.go (1)
SetupCoordinator(31-96)internal/test/helper.go (1)
WithStatusPersistence(84-88)
internal/cmd/coordinator.go (3)
internal/core/execution/dagrun.go (1)
DAGRunStore(23-48)internal/service/coordinator/service.go (2)
Service(21-34)NewService(36-57)internal/service/coordinator/handler.go (3)
Handler(39-58)WithDAGRunStore(64-68)WithLogDir(71-75)
internal/runtime/executor/task_test.go (4)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/status.go (3)
Running(8-8)NodeSucceeded(69-69)NodeFailed(67-67)internal/core/execution/node.go (1)
Node(9-36)internal/runtime/executor/task.go (1)
WithPreviousStatus(83-89)
internal/service/coordinator/client_test.go (1)
proto/coordinator/v1/coordinator.pb.go (9)
ReportStatusRequest(962-968)ReportStatusRequest(981-981)ReportStatusRequest(996-998)ReportStatusResponse(1015-1021)ReportStatusResponse(1034-1034)ReportStatusResponse(1049-1051)DAGRunStatusProto(1068-1096)DAGRunStatusProto(1109-1109)DAGRunStatusProto(1124-1126)
internal/service/scheduler/queue_processor.go (3)
proto/coordinator/v1/coordinator.pb.go (1)
Operation_OPERATION_RETRY(29-29)internal/core/execution/context.go (1)
Context(17-29)internal/core/execution/dagrun.go (1)
DAGRunRef(128-131)
internal/runtime/executor/task.go (3)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)proto/coordinator/v1/coordinator.pb.go (3)
Task(365-384)Task(397-397)Task(412-414)internal/proto/convert/status.go (1)
DAGRunStatusToProto(12-51)
internal/service/coordinator/handler_test.go (3)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/core/status.go (6)
Running(8-8)NodeRunning(66-66)NodeSucceeded(69-69)Failed(9-9)NodeFailed(67-67)Succeeded(11-11)
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1564 +/- ##
==========================================
+ Coverage 64.46% 66.32% +1.85%
==========================================
Files 241 245 +4
Lines 26586 27139 +553
==========================================
+ Hits 17139 18000 +861
+ Misses 7873 7528 -345
- Partials 1574 1611 +37
... and 4 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary by CodeRabbit
New Features
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.