Skip to content

worker: fix problems in shared nothing worker#1573

Merged
yohamta0 merged 30 commits into
mainfrom
fix-worker-feature
Jan 12, 2026
Merged

worker: fix problems in shared nothing worker#1573
yohamta0 merged 30 commits into
mainfrom
fix-worker-feature

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jan 11, 2026

Copy link
Copy Markdown
Collaborator

Summary by CodeRabbit

Release Notes

  • New Features

    • Added distributed DAG execution support with coordinator integration for remote execution on worker nodes.
    • Introduced live remote progress display with spinner animation for distributed DAG runs.
    • Added DAG run cancellation support for distributed executions.
    • Enabled real-time scheduler log streaming to coordinator.
    • Implemented graceful shutdown handling via SIGINT/SIGTERM signals.
  • Improvements

    • Automatic worker ID derivation from hostname when not explicitly configured.
    • Enhanced log file error handling with proper 404 detection in UI.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Jan 11, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

This PR introduces distributed DAG execution support by implementing coordinator-based task dispatch, remote progress tracking with animation, signal-aware context management for graceful shutdown, scheduler log streaming, cancellation request handling, and shared-nothing execution mode using no-op attempts.

Changes

Cohort / File(s) Summary
Distributed Execution (Command Layer)
internal/cmd/start.go, internal/cmd/retry.go, internal/cmd/restart.go
Add distributed execution paths: dispatch to coordinator when WorkerSelector is non-empty; wrap execution with signal-aware context; poll for completion with progress updates; handle cancellation signals with retry/cancellation requests.
Signal-aware Context Management
internal/cmd/server.go, internal/cmd/startall.go, internal/cmd/worker.go, internal/cmd/context.go
Introduce signal.NotifyContext for graceful shutdown on SIGINT/SIGTERM; propagate through service initialization and execution; add Context.WithContext() method; derive workerID from hostname@PID if not configured.
Remote Progress Tracking
internal/cmd/progress_remote.go, internal/common/stringutil/spinner.go
Add RemoteProgressDisplay type with animation loop (100ms ticker), TTY-aware spinner rendering, status polling from coordinator, and final summary output; extract spinner frames to shared SpinnerFrames constant.
Coordinator Integration
internal/service/coordinator/client.go, internal/service/coordinator/handler.go, internal/service/coordinator/log_handler.go
Add RequestCancel RPC and client implementation; enhance handler to create/cache root and sub-DAG attempts on dispatch; introduce log path transformation for shared-nothing mode; rename streamTypeToExtension to exported StreamTypeToExtension.
Remote Execution & Log Streaming
internal/runtime/remote/log_streamer.go, internal/service/worker/remote_handler.go, internal/runtime/agent/agent.go
Add real-time scheduler log streaming with chunking and sequence tracking; implement NewSchedulerLogWriter and StreamSchedulerLog; stream logs to coordinator during execution; use no-op attempt in shared-nothing mode; propagate handler hooks unconditionally.
Status & Execution Infrastructure
internal/core/execution/context.go, internal/core/execution/dagrun.go, internal/core/execution/noop_attempt.go, internal/core/status.go, internal/runtime/manager.go
Add Dispatcher.RequestCancel() interface method; add DAGRunStore.CreateSubAttempt() for sub-DAG runs; implement noopDAGRunAttempt (all methods are no-ops or return zero values); add NodeStatus.IsDone() recognizing terminal states; skip stale-status checks for distributed DAGs (WorkerID present).
Executor & Runtime
internal/runtime/executor/dag_runner.go, internal/service/scheduler/zombie_detector.go
Update SubDAGExecutor.Kill to use coordinator RequestCancel when available, falling back to local DB; introduce panicToError helper; skip zombie detection for distributed runs (WorkerID non-empty and not "local").
Frontend & API
internal/service/frontend/server.go, internal/service/frontend/api/v2/dagruns.go, internal/service/frontend/auth/oidc.go
Update NewServer signature to accept context.Context as first parameter; thread context through OIDC initialization; add 404 handling for missing log files; introduce helpers for step lookup, approval/rejection, DAG resumption, and audit logging; add distributed DAG delegation in termination.
Protocol Buffers
proto/coordinator/v1/coordinator.proto
Add RequestCancel RPC, RequestCancelRequest and RequestCancelResponse messages, and LOG_STREAM_TYPE_SCHEDULER enum value.
Testing Infrastructure
internal/common/telemetry/collector_test.go, internal/runtime/agent/dbclient_test.go, internal/service/coordinator/handler_test.go, internal/service/scheduler/zombie_detector_test.go, internal/service/worker/poller_test.go, internal/service/worker/remote_handler_test.go, internal/runtime/remote/status_pusher_test.go, internal/integration/queue_shell_test.go, internal/test/server.go
Add CreateSubAttempt mock method to multiple mock stores; add RequestCancel mock method to coordinator clients; update frontend.NewServer call sites to pass context.Context.
UI Components
ui/src/features/dags/components/dag-execution/ExecutionLog.tsx, ui/src/features/dags/components/dag-execution/StepLog.tsx
Differentiate 404 "not found" errors from other failures; suppress error UI for missing logs while preserving error display for other failure modes.

Sequence Diagram

sequenceDiagram
    participant User
    participant CLI as CLI (start/retry)
    participant Coordinator
    participant Worker
    participant DAGStore
    participant ProgressUI as Progress Display

    User->>CLI: Execute DAG with WorkerSelector
    CLI->>CLI: Detect WorkerSelector
    CLI->>ProgressUI: Initialize (if enabled)
    ProgressUI->>ProgressUI: Start animation loop
    CLI->>Coordinator: Dispatch task
    Coordinator->>DAGStore: Create/open attempt
    DAGStore-->>Coordinator: Attempt ready
    Coordinator-->>CLI: Dispatch accepted
    CLI->>Coordinator: Poll for status
    loop Every poll interval
        Coordinator->>Worker: Fetch status
        Worker-->>Coordinator: Current status
        Coordinator-->>CLI: Status update
        CLI->>ProgressUI: Update progress
        ProgressUI->>ProgressUI: Render spinner & counts
    end
    Worker->>Coordinator: Stream scheduler logs
    Coordinator-->>CLI: Log chunks
    Worker->>Coordinator: Report completion
    CLI->>ProgressUI: Stop animation
    ProgressUI->>ProgressUI: Print final summary
    ProgressUI-->>User: Display result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The PR title 'worker: fix problems in shared nothing worker' is vague and non-specific. It uses the generic term 'problems' and 'fix' without clarifying what specific issues were resolved or what the main change accomplishes. Replace with a more specific title that describes the actual changes, such as 'worker: implement shared-nothing worker support with distributed execution' or 'worker: add coordinator-based remote execution and log streaming'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/runtime/manager.go (1)

429-458: Remove dead code: execWithRecovery in manager.go is unused and duplicated in agent.go.

The function at lines 429-458 is never invoked. There's an identical implementation in internal/runtime/agent/agent.go:1325 that is actively used (5 calls across the codebase). Remove the unused version in manager.go to eliminate duplication and dead code.

internal/service/frontend/api/v2/api.go (1)

296-317: handleError: execution.Err... cases don’t set HTTP status (can return 500 for not-found/conflict).

Right now you only change code/message for execution.ErrDAGNotFound / ErrDAGRunIDNotFound / ErrDAGAlreadyExists, leaving httpStatusCode at 500 unless the error is already *Error. That can break clients and also mis-log expected errors as “Internal server error”.

Proposed fix
 	switch {
 	case errors.Is(err, execution.ErrDAGNotFound):
 		code = api.ErrorCodeNotFound
 		message = "DAG not found"
+		httpStatusCode = http.StatusNotFound
 	case errors.Is(err, execution.ErrDAGRunIDNotFound):
 		code = api.ErrorCodeNotFound
 		message = "dag-run ID not found"
+		httpStatusCode = http.StatusNotFound
 	case errors.Is(err, execution.ErrDAGAlreadyExists):
 		code = api.ErrorCodeAlreadyExists
 		message = "DAG already exists"
+		httpStatusCode = http.StatusConflict
 	}
🤖 Fix all issues with AI agents
In @internal/cmd/restart.go:
- Around line 131-139: The distributed execution branch in executeDAGWithRunID
skips the context tagging and restart initiation log by returning directly from
dispatchToCoordinatorAndWait; before calling dispatchToCoordinatorAndWait (or
immediately after obtaining coordinatorCli), add the same context tag population
and a restart-initiated log entry using ctx (matching the local path behavior),
ensure the tag includes the dag.ID and dagRunID, and propagate the tagged
context into dispatchToCoordinatorAndWait if that function accepts a context; if
not, add a short wrapper that logs the restart initiation with the same message
used for local restarts and then calls dispatchToCoordinatorAndWait.

In @internal/cmd/worker.go:
- Around line 6-7: os.Hostname() error is ignored which can produce a workerID
like "@<pid>"; change cmd/worker.go so you check the error and hostname before
composing workerID: call hostname, err := os.Hostname(); if err != nil ||
hostname == "" then leave workerID empty (so the fallback in
service/worker/worker.go can generate a proper ID), otherwise set workerID =
fmt.Sprintf("%s@%d", hostname, os.Getpid()); no label mutation changes needed.

In @internal/core/status.go:
- Around line 86-93: The IsDone() method on NodeStatus currently omits
NodeRejected and has a doc comment that doesn't match the returned set; update
the function to treat NodeRejected as terminal by adding a comparison for
NodeRejected in NodeStatus.IsDone(), and update the comment above IsDone to list
all terminal states (including "partially succeeded" and "rejected") so the docs
and behavior align; refer to the NodeStatus type and constants NodeSucceeded,
NodeFailed, NodeSkipped, NodeAborted, NodePartiallySucceeded, and NodeRejected
when making the change.

In @internal/service/coordinator/handler.go:
- Around line 183-207: Dispatch may call createAttemptForTask or
createSubAttemptForTask concurrently for the same task.DagRunId causing
duplicate/open-attempt leaks and h.openAttempts to be overwritten; fix by
serializing attempt creation per run: in both createAttemptForTask and
createSubAttemptForTask acquire the existing per-run mutex (the same mutex used
elsewhere), re-check h.openAttempts[task.DagRunId] before creating a new attempt
to avoid double-creation, ensure that if opening a new attempt fails you
close/cleanup the partially opened attempt and release the mutex, and apply the
identical pattern where Dispatch branches for root vs sub-DAG runs so concurrent
Dispatch calls cannot create duplicate attempts.
- Around line 853-899: RequestCancel is missing input validation before looking
up attempts; ensure req.DagName and req.DagRunId are non-empty and, when
isSubDAG (RootDagRunId != "" and != DagRunId), validate both req.RootDagRunName
and req.RootDagRunId are provided. Add checks at the start of
Handler.RequestCancel that return a status.Error(codes.InvalidArgument or
codes.FailedPrecondition) with a clear message when required fields are empty,
and only compute isSubDAG and call dagRunStore.FindSubAttempt / FindAttempt
after those validations succeed.

In @internal/service/coordinator/log_handler_test.go:
- Around line 39-45: The tests for StreamTypeToExtension are missing the
SCHEDULER case; update the test table in log_handler_test.go (the tests slice
iterated in the t.Run loop) to include a test entry with name like "scheduler",
streamType set to LOG_STREAM_TYPE_SCHEDULER, and expected "scheduler.log" so
StreamTypeToExtension is fully covered alongside STDOUT, STDERR, and
UNSPECIFIED.

In @internal/service/frontend/api/v2/dagruns.go:
- Around line 1118-1131: The code calls a.coordinatorCli.RequestCancel(...) when
savedStatus.WorkerID != "" without checking that a.coordinatorCli is non-nil;
add a nil-check for a.coordinatorCli before invoking RequestCancel and return an
Error (HTTP 500 or BadRequest consistent with other methods) or wrap fmt.Errorf
indicating coordinator client unconfigured if nil; update the block around
savedStatus.WorkerID to first if a.coordinatorCli == nil { return nil,
&Error{HTTPStatus: http.StatusInternalServerError, Code: api.ErrorCodeInternal,
Message: "coordinator client not configured"} } else call
a.coordinatorCli.RequestCancel(ctx, request.Name, request.DagRunId, nil) and
handle/wrap any error as before.
- Around line 357-362: The code currently detects missing log files by
string-matching err.Error(), which is brittle; change
internal/common/fileutil/logutil.go's ReadLogLines so it returns a wrapped
os.ErrNotExist when the file is missing (e.g., return nil, fmt.Errorf("file not
found: %w", os.ErrNotExist)), and then update the callers in
internal/service/frontend/api/v2/dagruns.go that call fileutil.ReadLogLines to
use errors.Is(err, os.ErrNotExist) instead of strings.Contains(err.Error(),
"file not found") to reliably detect missing files.

In @internal/service/scheduler/zombie_detector.go:
- Around line 144-151: The distributed-run guard using st.WorkerID is currently
checked after calling FindAttempt/ReadDAG, which risks those calls failing for
non-local workers; move the check for st.WorkerID != "" && st.WorkerID !=
"local" to the very start of the function (before any FindAttempt or ReadDAG
calls) and return early, and update the logger.Debug call used in that
early-return to log the skip without including queue-specific data (remove
tag.Queue(dag.ProcGroup()) or similar) so it never attempts to access DAG
artifacts; apply the same change to the other occurrence mentioned (the block
around lines 153-161).

In @ui/src/features/dags/components/dag-execution/ExecutionLog.tsx:
- Around line 277-289: The current 404 detection in the ExecutionLog component
relies on case-sensitive substring matching of error.message which can miss
variations; update the error check to first look for an HTTP 404 via
error.response?.status === 404 (or error.status === 404) and fall back to a
case-insensitive string match like error.message?.toLowerCase().includes('not
found'); modify the conditional around error and !logData in ExecutionLog.tsx so
it uses these checks (referencing the existing error and logData variables)
before deciding to render the generic error UI.
🧹 Nitpick comments (18)
internal/service/scheduler/zombie_detector.go (2)

16-22: Move panicToError into the recover block or enrich it (stack / “panic:” prefix).

Current helper is fine, but it drops stack context; consider attaching a stack (or at least prefixing the message) so panics are actionable in logs.


69-73: Recover logging: consider emitting stack trace (and/or recovered value separately).

Right now you only log an error, which makes panics hard to debug in prod. If you already have a standard for stack logging elsewhere, reuse it here.

internal/service/frontend/server.go (2)

70-73: NewServer(ctx, ...) API change is good; consider nil-ctx guard to prevent panics.
Go convention is “context must not be nil”; a defensive fallback avoids surprising crashes in older call sites/tests.

Proposed guard
 func NewServer(ctx context.Context, cfg *config.Config, dr execution.DAGStore, drs execution.DAGRunStore, qs execution.QueueStore, ps execution.ProcStore, drm runtime.Manager, cc coordinator.Client, sr execution.ServiceRegistry, mr *prometheus.Registry, collector *telemetry.Collector, rs *resource.Service) (*Server, error) {
+	if ctx == nil {
+		ctx = context.Background()
+	}
 	var remoteNodes []string

134-146: Good: builtin OIDC init now respects startup cancellation.
Minor: the “OIDC enabled…” log uses context.Background() (Line 146); if you want consistent structured tags/cancellation semantics, prefer ctx.

internal/integration/queue_shell_test.go (1)

40-58: Verify context cancellation already handles cleanup, but explicit server shutdown is still recommended.

The test's th.Context is reliably cancelled during test teardown (via helper.Cleanup() which calls Cancel()), so the server goroutine will exit. However, cancelling the context alone doesn't guarantee the HTTP listener is fully released. Adding explicit server.Shutdown() follows Go HTTP server best practices and ensures proper resource cleanup without relying on context cancellation timing.

Suggested cleanup
 		server, err := frontend.NewServer(
 			th.Context,
 			th.Config,
 			th.DAGStore,
 			th.DAGRunStore,
 			th.QueueStore,
 			th.ProcStore,
 			th.DAGRunMgr,
 			nil, // no coordinator client for local execution
 			th.ServiceRegistry,
 			nil, // no metrics registry
 			nil, // no telemetry collector
 			nil, // no resource service
 		)
 		require.NoError(t, err, "failed to create server")
+
+		t.Cleanup(func() {
+			_ = server.Shutdown(context.Background())
+		})

 		go func() {
 			_ = server.Serve(th.Context)
 		}()
internal/service/frontend/auth/oidc.go (1)

71-107: Nice improvement: OIDC init + retries are now cancellable.
Two follow-ups worth considering: (1) add defensive nil check for ctx in backoff.Retry to prevent panic, (2) wrap obviously-permanent errors (bad issuer URL format, unsupported scheme) with backoff.PermanentError() to avoid 10 unnecessary retry attempts for errors that will never succeed.

internal/common/stringutil/spinner.go (1)

5-5: Consider making SpinnerFrames immutable to prevent accidental modification.

While exported package-level slices are acceptable, they can be reassigned or mutated by consumers. Since these frames are intended to be a constant set of values, consider either:

  1. Using a function that returns a copy: func SpinnerFrames() []string
  2. Documenting that the slice should not be modified
  3. Accepting the current pattern as idiomatic for shared read-only data

The current approach is functional, but documenting the intended immutability would help prevent misuse.

♻️ Option 1: Use a function to return frames
-// SpinnerFrames contains the animation frames for a braille spinner.
-// Used by progress displays throughout the application.
-var SpinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}
+var spinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}
+
+// SpinnerFrames returns the animation frames for a braille spinner.
+// Used by progress displays throughout the application.
+func SpinnerFrames() []string {
+	return spinnerFrames
+}
♻️ Option 2: Add documentation warning
-// SpinnerFrames contains the animation frames for a braille spinner.
-// Used by progress displays throughout the application.
+// SpinnerFrames contains the animation frames for a braille spinner.
+// Used by progress displays throughout the application.
+// This slice should be treated as read-only and not modified.
 var SpinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}
internal/service/coordinator/log_handler.go (1)

243-256: Consider renaming function for clarity.

The function name StreamTypeToExtension suggests it returns just an extension (e.g., "log" or "stdout"), but it actually returns a filename suffix including the .log extension (e.g., "stdout.log", "scheduler.log").

Consider renaming to something like StreamTypeToFileSuffix or updating the return values to be true extensions without .log, then appending .log at the call site.

♻️ Alternative: Return true extensions

If you prefer to keep the function name as-is, you could return actual extensions and append .log at line 231:

-func StreamTypeToExtension(streamType coordinatorv1.LogStreamType) string {
+func StreamTypeToFileExtension(streamType coordinatorv1.LogStreamType) string {
 	switch streamType {
 	case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_STDOUT:
-		return "stdout.log"
+		return "stdout"
 	case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_STDERR:
-		return "stderr.log"
+		return "stderr"
 	case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_SCHEDULER:
-		return "scheduler.log"
+		return "scheduler"
 	case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED:
 		return "log"
 	}
 	return "log"
 }

Then update line 231:

-		filename = fmt.Sprintf("%s.%s", fileutil.SafeName(chunk.StepName), ext)
+		filename = fmt.Sprintf("%s.%s.log", fileutil.SafeName(chunk.StepName), ext)

Note: This would require updating test expectations as well.

internal/persistence/filedagrun/store.go (1)

435-457: Consider adding locking for sub-attempt creation.

The CreateAttempt method (lines 259-310) acquires a lock before creating a new DAG run or attempt to prevent race conditions. Since CreateSubAttempt also performs directory creation and file operations, it should similarly acquire a lock to ensure thread-safety and prevent concurrent creation conflicts.

🔒 Proposed fix to add locking
 func (store *Store) CreateSubAttempt(ctx context.Context, rootRef execution.DAGRunRef, subDAGRunID string) (execution.DAGRunAttempt, error) {
 	if rootRef.ID == "" {
 		return nil, ErrDAGRunIDEmpty
 	}
 
 	root := NewDataRoot(store.baseDir, rootRef.Name)
 	dagRun, err := root.FindByDAGRunID(ctx, rootRef.ID)
 	if err != nil {
 		return nil, fmt.Errorf("failed to find root dag-run: %w", err)
 	}
 
+	lockCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
+	defer cancel()
+
+	if err := root.Lock(lockCtx); err != nil {
+		return nil, fmt.Errorf("failed to acquire lock for sub dag-run %s: %w", subDAGRunID, err)
+	}
+	defer func() {
+		if err := root.Unlock(); err != nil {
+			logger.Error(ctx, "Failed to unlock sub dag-run", tag.RunID(subDAGRunID), tag.Error(err))
+		}
+	}()
+
 	// Create the sub-DAG run directory
 	subDAGRun, err := dagRun.CreateSubDAGRun(ctx, subDAGRunID)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create sub dag-run directory: %w", err)
 	}
 
 	// Create an attempt within the sub-DAG run
 	return subDAGRun.CreateAttempt(ctx, execution.NewUTC(time.Now()), store.cache)
 }
internal/runtime/executor/dag_runner.go (1)

535-577: Consider using a timeout context for cancellation requests.

Using context.Background() at line 535 means cancellation requests have no timeout. If the coordinator or database is unresponsive, this could cause the Kill method to hang indefinitely. Consider using a timeout context:

⏱️ Proposed fix to add timeout context
 func (e *SubDAGExecutor) Kill(sig os.Signal) error {
 	e.mu.Lock()
 	defer e.mu.Unlock()
 
 	var errs []error
-	ctx := context.Background()
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
 
 	// Cancel distributed runs
 	for runID := range e.distributedRuns {
internal/cmd/retry.go (1)

180-237: Consider reusing waitForDAGCompletionWithProgress and handleDistributedCancellation.

This function shares significant logic with dispatchToCoordinatorAndWait in start.go:

  • Signal-aware context setup (lines 183-185)
  • Progress display initialization (lines 188-202)
  • Status polling with progress updates (line 229)
  • Cancellation handling (lines 232-234)

The only differences are the task creation (using OPERATION_RETRY vs OPERATION_START and including WithPreviousStatus).

♻️ Suggested refactor to reduce duplication

Consider extracting the common orchestration logic into a shared helper:

// dispatchAndWait is a shared helper for dispatching tasks and waiting for completion
func dispatchAndWait(ctx *Context, dag *core.DAG, dagRunID string, coordinatorCli coordinator.Client, task *coordinatorv1.Task) error {
    // Signal-aware context, progress display, dispatch, wait, and cancellation handling
    // ... shared logic here
}

Then both dispatchToCoordinatorAndWait and dispatchRetryToCoordinatorAndWait can delegate to this helper with their respective tasks.

internal/runtime/remote/log_streamer.go (1)

94-169: Consider adding explicit stream closure on error paths.

When stream.Send(chunk) fails on line 143-145 or 162-163, the function returns immediately without calling CloseAndRecv(). While gRPC will eventually clean up the stream, explicitly closing it ensures proper resource release and server-side notification.

♻️ Suggested improvement
 func (s *LogStreamer) StreamSchedulerLog(ctx context.Context, logFilePath string) error {
 	// ... existing code ...
 
 	// Create a stream to the coordinator
 	stream, err := s.client.StreamLogs(ctx)
 	if err != nil {
 		return fmt.Errorf("failed to create log stream: %w", err)
 	}
+	defer func() {
+		// Ensure stream is closed on all paths
+		_, _ = stream.CloseAndRecv()
+	}()
 
 	// Split into chunks if necessary (scheduler logs can be large)
 	var sequence uint64 = 0
 	for len(data) > 0 {
 		// ... existing chunking code ...
 
 		if err := stream.Send(chunk); err != nil {
-			return fmt.Errorf("failed to send scheduler log chunk: %w", err)
+			return fmt.Errorf("failed to send scheduler log chunk: %w", err)
 		}
 	}
 
 	// Send final marker
 	// ... existing code ...
 
-	// Close and get response
-	_, err = stream.CloseAndRecv()
-	return err
+	return nil  // defer handles CloseAndRecv
 }
internal/cmd/start.go (1)

569-626: LGTM with one observation.

The polling logic is robust with:

  • Consecutive error tracking with a reasonable threshold (10 errors = ~10 seconds)
  • Periodic logging when progress display is disabled
  • Proper status checking for completion

The error message on line 622 could be more informative by including the actual error from the status if available (resp.Status.Error).

♻️ Minor improvement for error message
 			if !status.IsActive() {
 				if status.IsSuccess() {
 					logger.Info(ctx, "DAG completed successfully", tag.RunID(dagRunID))
 					return nil
 				}
-				return fmt.Errorf("DAG run failed with status: %s", status)
+				errMsg := resp.Status.Error
+				if errMsg == "" {
+					errMsg = "no error details available"
+				}
+				return fmt.Errorf("DAG run failed with status %s: %s", status, errMsg)
 			}
internal/service/coordinator/handler.go (2)

448-544: Log-path rewrite: handle empty step names (esp. handler nodes) to avoid “.stdout.log”.

transformLogPaths() uses node.Step.Name for filename generation; if that’s empty (common for synthetic/handler nodes depending on how they’re populated), you’ll end up with odd filenames like .stdout.log. Consider providing explicit names for handler nodes (onInit/onExit/…) or falling back when stepName == "".


52-60: Open-attempt + per-run mutex maps can grow unbounded in long-lived coordinators.

You’re intentionally keeping attempts open until coordinator shutdown; combined with runMutexes never being removed, this can become a slow FD/memory leak under many runs. If execution.DAGRunAttempt.Open() holds OS resources, this becomes a production reliability issue. Consider evicting/closing on terminal statuses (with “already-closed” tolerance) and deleting the per-run mutex entry once a run is terminal.

Also applies to: 109-123, 451-477

internal/runtime/agent/agent.go (1)

649-657: Avoid concrete *remote.LogStreamer type-assertion + guard against blocking shutdown.

This couples the agent to one concrete implementation and may silently skip scheduler-log streaming for wrappers/mocks. Also, streaming at the end can delay completion indefinitely if the coordinator is slow/unreachable. Consider asserting on a small interface and applying a timeout (optionally with context.WithoutCancel so it still attempts to flush on cancellation).

Proposed refactor
-	// Stream scheduler log to coordinator if using remote logging (shared-nothing mode)
-	if a.logWriterFactory != nil {
-		if streamer, ok := a.logWriterFactory.(*remote.LogStreamer); ok {
-			if err := streamer.StreamSchedulerLog(ctx, a.logFile); err != nil {
-				logger.Warn(ctx, "Failed to stream scheduler log", tag.Error(err))
-			}
-		}
-	}
+	// Stream scheduler log to coordinator if supported by the configured log writer factory.
+	type schedulerLogStreamer interface {
+		StreamSchedulerLog(context.Context, string) error
+	}
+	if streamer, ok := a.logWriterFactory.(schedulerLogStreamer); ok {
+		streamCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
+		defer cancel()
+		if err := streamer.StreamSchedulerLog(streamCtx, a.logFile); err != nil {
+			logger.Warn(ctx, "Failed to stream scheduler log", tag.Error(err))
+		}
+	}
internal/cmd/progress_remote.go (2)

167-179: Non-TTY mode never prints a “finished” line.

For CI logs / redirected stderr, printHeader() prints “Started: …” once, but printFinal() returns early when !isTTY, so you don’t get an explicit completion line. Consider printing a final one-liner in non-TTY too.

Also applies to: 201-222


182-199: Cap progress percent (and handle completed > total) to avoid >100%.

If the coordinator status ever includes extra nodes (or total is underestimated), percent can exceed 100 and the (completed/total) can look wrong. Consider clamping completedShown := min(p.completed, p.total) and percent := min(percent, 100).

Also applies to: 201-222

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd2b060 and 979120f.

⛔ Files ignored due to path filters (2)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (42)
  • internal/cmd/context.go
  • internal/cmd/progress_remote.go
  • internal/cmd/restart.go
  • internal/cmd/retry.go
  • internal/cmd/server.go
  • internal/cmd/start.go
  • internal/cmd/startall.go
  • internal/cmd/worker.go
  • internal/common/stringutil/spinner.go
  • internal/common/telemetry/collector_test.go
  • internal/core/execution/context.go
  • internal/core/execution/dagrun.go
  • internal/core/execution/noop_attempt.go
  • internal/core/status.go
  • internal/integration/queue_shell_test.go
  • internal/persistence/filedagrun/store.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/dbclient_test.go
  • internal/runtime/agent/progress_simple.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/manager.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/remote/status_pusher_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/coordinator/log_handler.go
  • internal/service/coordinator/log_handler_test.go
  • internal/service/frontend/api/v2/api.go
  • internal/service/frontend/api/v2/dagruns.go
  • internal/service/frontend/auth/oidc.go
  • internal/service/frontend/auth/oidc_test.go
  • internal/service/frontend/server.go
  • internal/service/scheduler/zombie_detector.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • internal/test/server.go
  • proto/coordinator/v1/coordinator.proto
  • ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
  • ui/src/features/dags/components/dag-execution/StepLog.tsx
🧰 Additional context used
📓 Path-based instructions (4)
ui/**/*.{ts,tsx,jsx,js}

📄 CodeRabbit inference engine (ui/CLAUDE.md)

ui/**/*.{ts,tsx,jsx,js}: Use developer-centric UI design with high information density, minimal whitespace, compact components, and no unnecessary decorations
Support both light and dark modes for all UI components using Tailwind CSS class pairs like dark:bg-slate-700
NEVER use full-page loading overlays or LoadingIndicator components that hide content - show stale data while fetching updates instead
Use compact modal design with small headers, minimal padding (p-2 or p-3), tight spacing, and support keyboard navigation (arrows, enter, escape)
Use small heights for form elements: select boxes h-7 or smaller, buttons h-7 or h-8, inputs with compact padding (py-0.5 or py-1)
Minimize row heights in tables and lists while maintaining readability, merge related columns, and always handle long text with whitespace-normal break-words
Use consistent metadata styling with bg-slate-200 dark:bg-slate-700 backgrounds and maintain text hierarchy with primary/secondary/muted text colors
Use flexbox-first layouts with min-h-0 and overflow-hidden to prevent layout breaks, account for fixed elements when setting heights
Maintain keyboard navigation support in all interactive components with appropriate focus indicators and ARIA labels

Files:

  • ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
  • ui/src/features/dags/components/dag-execution/StepLog.tsx
ui/**/*.{ts,tsx}

📄 CodeRabbit inference engine (AGENTS.md)

ui/**/*.{ts,tsx}: The React + TypeScript frontend resides in ui/, with production bundles copied to internal/service/frontend/assets by make ui
UI code follows ESLint + Prettier (2-space indent) and Tailwind utilities; name React components in PascalCase (JobList.tsx) and hooks with use* (useJobs.ts)

Files:

  • ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
  • ui/src/features/dags/components/dag-execution/StepLog.tsx
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/common/stringutil/spinner.go
  • internal/core/execution/context.go
  • internal/service/coordinator/log_handler_test.go
  • internal/common/telemetry/collector_test.go
  • internal/cmd/worker.go
  • internal/service/frontend/auth/oidc_test.go
  • internal/runtime/remote/status_pusher_test.go
  • internal/core/execution/dagrun.go
  • internal/runtime/executor/dag_runner.go
  • internal/cmd/context.go
  • internal/runtime/agent/progress_simple.go
  • internal/cmd/start.go
  • internal/cmd/retry.go
  • internal/service/worker/remote_handler.go
  • internal/persistence/filedagrun/store.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/poller_test.go
  • internal/core/status.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/log_handler.go
  • internal/cmd/progress_remote.go
  • internal/service/frontend/server.go
  • internal/service/coordinator/handler.go
  • internal/service/scheduler/zombie_detector.go
  • internal/core/execution/noop_attempt.go
  • internal/test/server.go
  • internal/cmd/startall.go
  • internal/runtime/remote/log_streamer.go
  • internal/runtime/manager.go
  • internal/service/frontend/auth/oidc.go
  • internal/integration/queue_shell_test.go
  • internal/service/worker/remote_handler_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/dbclient_test.go
  • internal/service/frontend/api/v2/api.go
  • internal/cmd/server.go
  • internal/service/frontend/api/v2/dagruns.go
  • internal/cmd/restart.go
  • internal/service/coordinator/handler_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/service/coordinator/log_handler_test.go
  • internal/common/telemetry/collector_test.go
  • internal/service/frontend/auth/oidc_test.go
  • internal/runtime/remote/status_pusher_test.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/poller_test.go
  • internal/integration/queue_shell_test.go
  • internal/service/worker/remote_handler_test.go
  • internal/runtime/agent/dbclient_test.go
  • internal/service/coordinator/handler_test.go
🧠 Learnings (2)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/cmd/start.go
  • internal/cmd/retry.go
  • internal/cmd/startall.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make run` starts the Go scheduler and serves the compiled UI (fails fast if `ui/dist` is missing)

Applied to files:

  • internal/integration/queue_shell_test.go
🧬 Code graph analysis (32)
internal/core/execution/context.go (1)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/service/coordinator/log_handler_test.go (1)
internal/service/coordinator/log_handler.go (1)
  • StreamTypeToExtension (244-256)
internal/common/telemetry/collector_test.go (2)
internal/cmd/context.go (1)
  • Context (44-59)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/service/frontend/auth/oidc_test.go (2)
internal/service/frontend/auth/oidc.go (1)
  • InitVerifierAndConfig (137-160)
internal/common/config/config.go (1)
  • AuthOIDC (242-256)
internal/runtime/remote/status_pusher_test.go (3)
internal/cmd/context.go (1)
  • Context (44-59)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/core/execution/dagrun.go (2)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/runtime/agent/progress_simple.go (2)
internal/core/status.go (1)
  • Status (4-4)
internal/common/stringutil/spinner.go (1)
  • SpinnerFrames (5-5)
internal/cmd/start.go (4)
internal/core/dag.go (1)
  • DAG (63-178)
internal/service/coordinator/client.go (1)
  • Client (29-59)
internal/cmd/progress_remote.go (2)
  • RemoteProgressDisplay (21-36)
  • NewRemoteProgressDisplay (39-54)
internal/runtime/executor/task.go (1)
  • CreateTask (12-33)
internal/cmd/retry.go (5)
internal/core/dag.go (1)
  • DAG (63-178)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/service/coordinator/client.go (1)
  • Client (29-59)
internal/runtime/executor/task.go (4)
  • TaskOption (36-36)
  • WithWorkerSelector (68-72)
  • WithPreviousStatus (83-89)
  • WithStep (75-79)
proto/coordinator/v1/coordinator.pb.go (1)
  • Operation_OPERATION_RETRY (29-29)
internal/service/worker/remote_handler.go (4)
internal/common/logger/context.go (3)
  • Errorf (75-77)
  • Warn (45-47)
  • WithLogger (12-14)
internal/common/logger/tag/tag.go (1)
  • Target (168-170)
internal/persistence/filedagrun/writer.go (1)
  • Writer (35-42)
internal/common/logger/logger.go (2)
  • NewLogger (93-133)
  • WithWriter (69-73)
internal/persistence/filedagrun/store.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/persistence/filedagrun/dataroot.go (1)
  • NewDataRoot (50-78)
internal/service/scheduler/zombie_detector_test.go (2)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/service/worker/poller_test.go (3)
internal/core/execution/context.go (1)
  • Context (17-29)
internal/runtime/context.go (1)
  • Context (14-14)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/service/coordinator/client.go (2)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
proto/coordinator/v1/coordinator.pb.go (3)
  • RequestCancelRequest (1963-1972)
  • RequestCancelRequest (1985-1985)
  • RequestCancelRequest (2000-2002)
internal/service/coordinator/log_handler.go (2)
proto/coordinator/v1/coordinator.pb.go (7)
  • LogStreamType_LOG_STREAM_TYPE_SCHEDULER (133-133)
  • LogStreamType (127-127)
  • LogStreamType (162-164)
  • LogStreamType (166-168)
  • LogStreamType (175-177)
  • LogStreamType_LOG_STREAM_TYPE_STDOUT (131-131)
  • LogStreamType_LOG_STREAM_TYPE_STDERR (132-132)
api/v2/api.gen.go (1)
  • StepName (1410-1410)
internal/cmd/progress_remote.go (6)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
proto/coordinator/v1/coordinator.pb.go (3)
  • DAGRunStatusProto (1071-1099)
  • DAGRunStatusProto (1112-1112)
  • DAGRunStatusProto (1127-1129)
internal/proto/convert/status.go (1)
  • ProtoToDAGRunStatus (114-153)
internal/core/status.go (3)
  • Status (4-4)
  • Aborted (10-10)
  • Failed (9-9)
internal/output/tree.go (1)
  • NewRenderer (84-86)
internal/common/stringutil/spinner.go (1)
  • SpinnerFrames (5-5)
internal/service/frontend/server.go (1)
internal/service/frontend/auth/oidc.go (1)
  • InitVerifierAndConfig (137-160)
internal/service/coordinator/handler.go (4)
proto/coordinator/v1/coordinator.pb.go (19)
  • DispatchRequest (286-291)
  • DispatchRequest (304-304)
  • DispatchRequest (319-321)
  • DispatchResponse (331-335)
  • DispatchResponse (348-348)
  • DispatchResponse (363-365)
  • Task (368-387)
  • Task (400-400)
  • Task (415-417)
  • Operation (24-24)
  • Operation (56-58)
  • Operation (60-62)
  • Operation (69-71)
  • RequestCancelRequest (1963-1972)
  • RequestCancelRequest (1985-1985)
  • RequestCancelRequest (2000-2002)
  • RequestCancelResponse (2033-2039)
  • RequestCancelResponse (2052-2052)
  • RequestCancelResponse (2067-2069)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/service/coordinator/log_handler.go (1)
  • StreamTypeToExtension (244-256)
internal/service/scheduler/zombie_detector.go (3)
internal/common/logger/context.go (3)
  • Errorf (75-77)
  • Error (50-52)
  • Debug (35-37)
internal/common/logger/tag/tag.go (3)
  • Error (20-22)
  • WorkerID (65-67)
  • Queue (126-128)
internal/persistence/fileproc/procgrp.go (1)
  • ProcGroup (20-27)
internal/core/execution/noop_attempt.go (2)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
internal/test/server.go (2)
internal/service/frontend/server.go (1)
  • NewServer (72-176)
internal/cmd/context.go (1)
  • Context (44-59)
internal/cmd/startall.go (2)
internal/cmd/context.go (1)
  • Context (44-59)
internal/core/execution/context.go (2)
  • Context (17-29)
  • WithContext (286-288)
internal/runtime/remote/log_streamer.go (2)
proto/coordinator/v1/coordinator.pb.go (4)
  • LogChunk (1643-1660)
  • LogChunk (1673-1673)
  • LogChunk (1688-1690)
  • LogStreamType_LOG_STREAM_TYPE_SCHEDULER (133-133)
proto/coordinator/v1/coordinator_grpc.pb.go (1)
  • CoordinatorService_StreamLogsClient (130-130)
internal/runtime/manager.go (1)
internal/common/logger/tag/tag.go (1)
  • WorkerID (65-67)
internal/service/frontend/auth/oidc.go (2)
internal/common/backoff/retry.go (1)
  • Retry (31-83)
internal/common/config/config.go (1)
  • AuthOIDC (242-256)
internal/integration/queue_shell_test.go (1)
internal/cmd/context.go (1)
  • Context (44-59)
internal/service/worker/remote_handler_test.go (1)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/runtime/agent/agent.go (3)
internal/runtime/remote/log_streamer.go (1)
  • LogStreamer (27-35)
internal/core/dag.go (1)
  • HandlerOn (551-558)
internal/core/execution/noop_attempt.go (1)
  • NewNoopDAGRunAttempt (20-22)
internal/runtime/agent/dbclient_test.go (2)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/service/frontend/api/v2/api.go (1)
internal/service/frontend/auth/oidc.go (1)
  • InitVerifierAndConfig (137-160)
internal/cmd/server.go (3)
internal/cmd/context.go (1)
  • Context (44-59)
internal/common/config/config.go (2)
  • Config (10-43)
  • Server (88-135)
internal/runtime/runner.go (1)
  • Config (96-110)
internal/service/coordinator/handler_test.go (3)
internal/core/execution/dagrun.go (1)
  • DAGRunRef (132-135)
internal/core/execution/dagrun_attempt.go (1)
  • DAGRunAttempt (19-52)
internal/core/execution/runstatus.go (1)
  • DAGRunStatus (36-61)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build
  • GitHub Check: Test on ubuntu-latest

Comment thread internal/cmd/restart.go Outdated
Comment thread internal/cmd/worker.go
Comment thread internal/core/exec/noop_attempt.go
Comment thread internal/core/status.go Outdated
Comment thread internal/service/coordinator/handler.go
Comment thread internal/service/coordinator/log_handler_test.go
Comment thread internal/service/frontend/api/v2/dagruns.go Outdated
Comment thread internal/service/frontend/api/v2/dagruns.go
Comment thread internal/service/scheduler/zombie_detector.go
Comment thread ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
@yohamta0 yohamta0 merged commit 1557b14 into main Jan 12, 2026
10 of 11 checks passed
@yohamta0 yohamta0 deleted the fix-worker-feature branch January 12, 2026 15:37
@codecov

codecov Bot commented Jan 12, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 45.93796% with 366 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.79%. Comparing base (a3709d3) to head (6002dc4).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
internal/cmd/progress_remote.go 0.00% 142 Missing ⚠️
internal/cmd/start.go 15.74% 106 Missing and 1 partial ⚠️
internal/core/exec/noop_attempt.go 0.00% 32 Missing ⚠️
internal/runtime/agent/agent.go 57.14% 15 Missing and 3 partials ⚠️
internal/persis/filedagrun/store.go 70.45% 9 Missing and 4 partials ⚠️
internal/runtime/builtin/chat/testing.go 0.00% 7 Missing ⚠️
internal/runtime/builtin/chat/executor.go 66.66% 6 Missing ⚠️
internal/cmd/worker.go 0.00% 5 Missing ⚠️
internal/persis/filequeue/store.go 61.53% 5 Missing ⚠️
internal/persis/filedagrun/dagrun.go 69.23% 3 Missing and 1 partial ⚠️
... and 18 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1573      +/-   ##
==========================================
- Coverage   66.64%   64.79%   -1.85%     
==========================================
  Files         250      255       +5     
  Lines       27678    28380     +702     
==========================================
- Hits        18445    18389      -56     
- Misses       7600     8343     +743     
- Partials     1633     1648      +15     
Files with missing lines Coverage Δ
cmd/main.go 86.95% <ø> (ø)
internal/cmd/agent_executor.go 54.76% <ø> (ø)
internal/cmd/cleanup.go 70.83% <100.00%> (ø)
internal/cmd/coord.go 73.72% <100.00%> (ø)
internal/cmd/dequeue.go 51.31% <100.00%> (ø)
internal/cmd/dry.go 76.81% <100.00%> (+0.34%) ⬆️
internal/cmd/exec.go 60.36% <100.00%> (ø)
internal/cmd/exec_spec.go 62.29% <ø> (ø)
internal/cmd/flags.go 100.00% <ø> (ø)
internal/cmd/migrate.go 54.21% <ø> (ø)
... and 132 more

... and 13 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cd2b060...6002dc4. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant