worker: fix problems in shared nothing worker#1573
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis PR introduces distributed DAG execution support by implementing coordinator-based task dispatch, remote progress tracking with animation, signal-aware context management for graceful shutdown, scheduler log streaming, cancellation request handling, and shared-nothing execution mode using no-op attempts. Changes
Sequence DiagramsequenceDiagram
participant User
participant CLI as CLI (start/retry)
participant Coordinator
participant Worker
participant DAGStore
participant ProgressUI as Progress Display
User->>CLI: Execute DAG with WorkerSelector
CLI->>CLI: Detect WorkerSelector
CLI->>ProgressUI: Initialize (if enabled)
ProgressUI->>ProgressUI: Start animation loop
CLI->>Coordinator: Dispatch task
Coordinator->>DAGStore: Create/open attempt
DAGStore-->>Coordinator: Attempt ready
Coordinator-->>CLI: Dispatch accepted
CLI->>Coordinator: Poll for status
loop Every poll interval
Coordinator->>Worker: Fetch status
Worker-->>Coordinator: Current status
Coordinator-->>CLI: Status update
CLI->>ProgressUI: Update progress
ProgressUI->>ProgressUI: Render spinner & counts
end
Worker->>Coordinator: Stream scheduler logs
Coordinator-->>CLI: Log chunks
Worker->>Coordinator: Report completion
CLI->>ProgressUI: Stop animation
ProgressUI->>ProgressUI: Print final summary
ProgressUI-->>User: Display result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
internal/runtime/manager.go (1)
429-458: Remove dead code:execWithRecoveryin manager.go is unused and duplicated in agent.go.The function at lines 429-458 is never invoked. There's an identical implementation in
internal/runtime/agent/agent.go:1325that is actively used (5 calls across the codebase). Remove the unused version in manager.go to eliminate duplication and dead code.internal/service/frontend/api/v2/api.go (1)
296-317:handleError:execution.Err...cases don’t set HTTP status (can return 500 for not-found/conflict).Right now you only change
code/messageforexecution.ErrDAGNotFound/ErrDAGRunIDNotFound/ErrDAGAlreadyExists, leavinghttpStatusCodeat 500 unless the error is already*Error. That can break clients and also mis-log expected errors as “Internal server error”.Proposed fix
switch { case errors.Is(err, execution.ErrDAGNotFound): code = api.ErrorCodeNotFound message = "DAG not found" + httpStatusCode = http.StatusNotFound case errors.Is(err, execution.ErrDAGRunIDNotFound): code = api.ErrorCodeNotFound message = "dag-run ID not found" + httpStatusCode = http.StatusNotFound case errors.Is(err, execution.ErrDAGAlreadyExists): code = api.ErrorCodeAlreadyExists message = "DAG already exists" + httpStatusCode = http.StatusConflict }
🤖 Fix all issues with AI agents
In @internal/cmd/restart.go:
- Around line 131-139: The distributed execution branch in executeDAGWithRunID
skips the context tagging and restart initiation log by returning directly from
dispatchToCoordinatorAndWait; before calling dispatchToCoordinatorAndWait (or
immediately after obtaining coordinatorCli), add the same context tag population
and a restart-initiated log entry using ctx (matching the local path behavior),
ensure the tag includes the dag.ID and dagRunID, and propagate the tagged
context into dispatchToCoordinatorAndWait if that function accepts a context; if
not, add a short wrapper that logs the restart initiation with the same message
used for local restarts and then calls dispatchToCoordinatorAndWait.
In @internal/cmd/worker.go:
- Around line 6-7: os.Hostname() error is ignored which can produce a workerID
like "@<pid>"; change cmd/worker.go so you check the error and hostname before
composing workerID: call hostname, err := os.Hostname(); if err != nil ||
hostname == "" then leave workerID empty (so the fallback in
service/worker/worker.go can generate a proper ID), otherwise set workerID =
fmt.Sprintf("%s@%d", hostname, os.Getpid()); no label mutation changes needed.
In @internal/core/status.go:
- Around line 86-93: The IsDone() method on NodeStatus currently omits
NodeRejected and has a doc comment that doesn't match the returned set; update
the function to treat NodeRejected as terminal by adding a comparison for
NodeRejected in NodeStatus.IsDone(), and update the comment above IsDone to list
all terminal states (including "partially succeeded" and "rejected") so the docs
and behavior align; refer to the NodeStatus type and constants NodeSucceeded,
NodeFailed, NodeSkipped, NodeAborted, NodePartiallySucceeded, and NodeRejected
when making the change.
In @internal/service/coordinator/handler.go:
- Around line 183-207: Dispatch may call createAttemptForTask or
createSubAttemptForTask concurrently for the same task.DagRunId causing
duplicate/open-attempt leaks and h.openAttempts to be overwritten; fix by
serializing attempt creation per run: in both createAttemptForTask and
createSubAttemptForTask acquire the existing per-run mutex (the same mutex used
elsewhere), re-check h.openAttempts[task.DagRunId] before creating a new attempt
to avoid double-creation, ensure that if opening a new attempt fails you
close/cleanup the partially opened attempt and release the mutex, and apply the
identical pattern where Dispatch branches for root vs sub-DAG runs so concurrent
Dispatch calls cannot create duplicate attempts.
- Around line 853-899: RequestCancel is missing input validation before looking
up attempts; ensure req.DagName and req.DagRunId are non-empty and, when
isSubDAG (RootDagRunId != "" and != DagRunId), validate both req.RootDagRunName
and req.RootDagRunId are provided. Add checks at the start of
Handler.RequestCancel that return a status.Error(codes.InvalidArgument or
codes.FailedPrecondition) with a clear message when required fields are empty,
and only compute isSubDAG and call dagRunStore.FindSubAttempt / FindAttempt
after those validations succeed.
In @internal/service/coordinator/log_handler_test.go:
- Around line 39-45: The tests for StreamTypeToExtension are missing the
SCHEDULER case; update the test table in log_handler_test.go (the tests slice
iterated in the t.Run loop) to include a test entry with name like "scheduler",
streamType set to LOG_STREAM_TYPE_SCHEDULER, and expected "scheduler.log" so
StreamTypeToExtension is fully covered alongside STDOUT, STDERR, and
UNSPECIFIED.
In @internal/service/frontend/api/v2/dagruns.go:
- Around line 1118-1131: The code calls a.coordinatorCli.RequestCancel(...) when
savedStatus.WorkerID != "" without checking that a.coordinatorCli is non-nil;
add a nil-check for a.coordinatorCli before invoking RequestCancel and return an
Error (HTTP 500 or BadRequest consistent with other methods) or wrap fmt.Errorf
indicating coordinator client unconfigured if nil; update the block around
savedStatus.WorkerID to first if a.coordinatorCli == nil { return nil,
&Error{HTTPStatus: http.StatusInternalServerError, Code: api.ErrorCodeInternal,
Message: "coordinator client not configured"} } else call
a.coordinatorCli.RequestCancel(ctx, request.Name, request.DagRunId, nil) and
handle/wrap any error as before.
- Around line 357-362: The code currently detects missing log files by
string-matching err.Error(), which is brittle; change
internal/common/fileutil/logutil.go's ReadLogLines so it returns a wrapped
os.ErrNotExist when the file is missing (e.g., return nil, fmt.Errorf("file not
found: %w", os.ErrNotExist)), and then update the callers in
internal/service/frontend/api/v2/dagruns.go that call fileutil.ReadLogLines to
use errors.Is(err, os.ErrNotExist) instead of strings.Contains(err.Error(),
"file not found") to reliably detect missing files.
In @internal/service/scheduler/zombie_detector.go:
- Around line 144-151: The distributed-run guard using st.WorkerID is currently
checked after calling FindAttempt/ReadDAG, which risks those calls failing for
non-local workers; move the check for st.WorkerID != "" && st.WorkerID !=
"local" to the very start of the function (before any FindAttempt or ReadDAG
calls) and return early, and update the logger.Debug call used in that
early-return to log the skip without including queue-specific data (remove
tag.Queue(dag.ProcGroup()) or similar) so it never attempts to access DAG
artifacts; apply the same change to the other occurrence mentioned (the block
around lines 153-161).
In @ui/src/features/dags/components/dag-execution/ExecutionLog.tsx:
- Around line 277-289: The current 404 detection in the ExecutionLog component
relies on case-sensitive substring matching of error.message which can miss
variations; update the error check to first look for an HTTP 404 via
error.response?.status === 404 (or error.status === 404) and fall back to a
case-insensitive string match like error.message?.toLowerCase().includes('not
found'); modify the conditional around error and !logData in ExecutionLog.tsx so
it uses these checks (referencing the existing error and logData variables)
before deciding to render the generic error UI.
🧹 Nitpick comments (18)
internal/service/scheduler/zombie_detector.go (2)
16-22: MovepanicToErrorinto the recover block or enrich it (stack / “panic:” prefix).Current helper is fine, but it drops stack context; consider attaching a stack (or at least prefixing the message) so panics are actionable in logs.
69-73: Recover logging: consider emitting stack trace (and/or recovered value separately).Right now you only log an
error, which makes panics hard to debug in prod. If you already have a standard for stack logging elsewhere, reuse it here.internal/service/frontend/server.go (2)
70-73: NewServer(ctx, ...) API change is good; consider nil-ctx guard to prevent panics.
Go convention is “context must not be nil”; a defensive fallback avoids surprising crashes in older call sites/tests.Proposed guard
func NewServer(ctx context.Context, cfg *config.Config, dr execution.DAGStore, drs execution.DAGRunStore, qs execution.QueueStore, ps execution.ProcStore, drm runtime.Manager, cc coordinator.Client, sr execution.ServiceRegistry, mr *prometheus.Registry, collector *telemetry.Collector, rs *resource.Service) (*Server, error) { + if ctx == nil { + ctx = context.Background() + } var remoteNodes []string
134-146: Good: builtin OIDC init now respects startup cancellation.
Minor: the “OIDC enabled…” log usescontext.Background()(Line 146); if you want consistent structured tags/cancellation semantics, preferctx.internal/integration/queue_shell_test.go (1)
40-58: Verify context cancellation already handles cleanup, but explicit server shutdown is still recommended.The test's
th.Contextis reliably cancelled during test teardown (viahelper.Cleanup()which callsCancel()), so the server goroutine will exit. However, cancelling the context alone doesn't guarantee the HTTP listener is fully released. Adding explicitserver.Shutdown()follows Go HTTP server best practices and ensures proper resource cleanup without relying on context cancellation timing.Suggested cleanup
server, err := frontend.NewServer( th.Context, th.Config, th.DAGStore, th.DAGRunStore, th.QueueStore, th.ProcStore, th.DAGRunMgr, nil, // no coordinator client for local execution th.ServiceRegistry, nil, // no metrics registry nil, // no telemetry collector nil, // no resource service ) require.NoError(t, err, "failed to create server") + + t.Cleanup(func() { + _ = server.Shutdown(context.Background()) + }) go func() { _ = server.Serve(th.Context) }()internal/service/frontend/auth/oidc.go (1)
71-107: Nice improvement: OIDC init + retries are now cancellable.
Two follow-ups worth considering: (1) add defensive nil check forctxinbackoff.Retryto prevent panic, (2) wrap obviously-permanent errors (bad issuer URL format, unsupported scheme) withbackoff.PermanentError()to avoid 10 unnecessary retry attempts for errors that will never succeed.internal/common/stringutil/spinner.go (1)
5-5: Consider making SpinnerFrames immutable to prevent accidental modification.While exported package-level slices are acceptable, they can be reassigned or mutated by consumers. Since these frames are intended to be a constant set of values, consider either:
- Using a function that returns a copy:
func SpinnerFrames() []string- Documenting that the slice should not be modified
- Accepting the current pattern as idiomatic for shared read-only data
The current approach is functional, but documenting the intended immutability would help prevent misuse.
♻️ Option 1: Use a function to return frames
-// SpinnerFrames contains the animation frames for a braille spinner. -// Used by progress displays throughout the application. -var SpinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"} +var spinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"} + +// SpinnerFrames returns the animation frames for a braille spinner. +// Used by progress displays throughout the application. +func SpinnerFrames() []string { + return spinnerFrames +}♻️ Option 2: Add documentation warning
-// SpinnerFrames contains the animation frames for a braille spinner. -// Used by progress displays throughout the application. +// SpinnerFrames contains the animation frames for a braille spinner. +// Used by progress displays throughout the application. +// This slice should be treated as read-only and not modified. var SpinnerFrames = []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"}internal/service/coordinator/log_handler.go (1)
243-256: Consider renaming function for clarity.The function name
StreamTypeToExtensionsuggests it returns just an extension (e.g., "log" or "stdout"), but it actually returns a filename suffix including the.logextension (e.g., "stdout.log", "scheduler.log").Consider renaming to something like
StreamTypeToFileSuffixor updating the return values to be true extensions without.log, then appending.logat the call site.♻️ Alternative: Return true extensions
If you prefer to keep the function name as-is, you could return actual extensions and append
.logat line 231:-func StreamTypeToExtension(streamType coordinatorv1.LogStreamType) string { +func StreamTypeToFileExtension(streamType coordinatorv1.LogStreamType) string { switch streamType { case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_STDOUT: - return "stdout.log" + return "stdout" case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_STDERR: - return "stderr.log" + return "stderr" case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_SCHEDULER: - return "scheduler.log" + return "scheduler" case coordinatorv1.LogStreamType_LOG_STREAM_TYPE_UNSPECIFIED: return "log" } return "log" }Then update line 231:
- filename = fmt.Sprintf("%s.%s", fileutil.SafeName(chunk.StepName), ext) + filename = fmt.Sprintf("%s.%s.log", fileutil.SafeName(chunk.StepName), ext)Note: This would require updating test expectations as well.
internal/persistence/filedagrun/store.go (1)
435-457: Consider adding locking for sub-attempt creation.The
CreateAttemptmethod (lines 259-310) acquires a lock before creating a new DAG run or attempt to prevent race conditions. SinceCreateSubAttemptalso performs directory creation and file operations, it should similarly acquire a lock to ensure thread-safety and prevent concurrent creation conflicts.🔒 Proposed fix to add locking
func (store *Store) CreateSubAttempt(ctx context.Context, rootRef execution.DAGRunRef, subDAGRunID string) (execution.DAGRunAttempt, error) { if rootRef.ID == "" { return nil, ErrDAGRunIDEmpty } root := NewDataRoot(store.baseDir, rootRef.Name) dagRun, err := root.FindByDAGRunID(ctx, rootRef.ID) if err != nil { return nil, fmt.Errorf("failed to find root dag-run: %w", err) } + lockCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + if err := root.Lock(lockCtx); err != nil { + return nil, fmt.Errorf("failed to acquire lock for sub dag-run %s: %w", subDAGRunID, err) + } + defer func() { + if err := root.Unlock(); err != nil { + logger.Error(ctx, "Failed to unlock sub dag-run", tag.RunID(subDAGRunID), tag.Error(err)) + } + }() + // Create the sub-DAG run directory subDAGRun, err := dagRun.CreateSubDAGRun(ctx, subDAGRunID) if err != nil { return nil, fmt.Errorf("failed to create sub dag-run directory: %w", err) } // Create an attempt within the sub-DAG run return subDAGRun.CreateAttempt(ctx, execution.NewUTC(time.Now()), store.cache) }internal/runtime/executor/dag_runner.go (1)
535-577: Consider using a timeout context for cancellation requests.Using
context.Background()at line 535 means cancellation requests have no timeout. If the coordinator or database is unresponsive, this could cause theKillmethod to hang indefinitely. Consider using a timeout context:⏱️ Proposed fix to add timeout context
func (e *SubDAGExecutor) Kill(sig os.Signal) error { e.mu.Lock() defer e.mu.Unlock() var errs []error - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() // Cancel distributed runs for runID := range e.distributedRuns {internal/cmd/retry.go (1)
180-237: Consider reusingwaitForDAGCompletionWithProgressandhandleDistributedCancellation.This function shares significant logic with
dispatchToCoordinatorAndWaitinstart.go:
- Signal-aware context setup (lines 183-185)
- Progress display initialization (lines 188-202)
- Status polling with progress updates (line 229)
- Cancellation handling (lines 232-234)
The only differences are the task creation (using
OPERATION_RETRYvsOPERATION_STARTand includingWithPreviousStatus).♻️ Suggested refactor to reduce duplication
Consider extracting the common orchestration logic into a shared helper:
// dispatchAndWait is a shared helper for dispatching tasks and waiting for completion func dispatchAndWait(ctx *Context, dag *core.DAG, dagRunID string, coordinatorCli coordinator.Client, task *coordinatorv1.Task) error { // Signal-aware context, progress display, dispatch, wait, and cancellation handling // ... shared logic here }Then both
dispatchToCoordinatorAndWaitanddispatchRetryToCoordinatorAndWaitcan delegate to this helper with their respective tasks.internal/runtime/remote/log_streamer.go (1)
94-169: Consider adding explicit stream closure on error paths.When
stream.Send(chunk)fails on line 143-145 or 162-163, the function returns immediately without callingCloseAndRecv(). While gRPC will eventually clean up the stream, explicitly closing it ensures proper resource release and server-side notification.♻️ Suggested improvement
func (s *LogStreamer) StreamSchedulerLog(ctx context.Context, logFilePath string) error { // ... existing code ... // Create a stream to the coordinator stream, err := s.client.StreamLogs(ctx) if err != nil { return fmt.Errorf("failed to create log stream: %w", err) } + defer func() { + // Ensure stream is closed on all paths + _, _ = stream.CloseAndRecv() + }() // Split into chunks if necessary (scheduler logs can be large) var sequence uint64 = 0 for len(data) > 0 { // ... existing chunking code ... if err := stream.Send(chunk); err != nil { - return fmt.Errorf("failed to send scheduler log chunk: %w", err) + return fmt.Errorf("failed to send scheduler log chunk: %w", err) } } // Send final marker // ... existing code ... - // Close and get response - _, err = stream.CloseAndRecv() - return err + return nil // defer handles CloseAndRecv }internal/cmd/start.go (1)
569-626: LGTM with one observation.The polling logic is robust with:
- Consecutive error tracking with a reasonable threshold (10 errors = ~10 seconds)
- Periodic logging when progress display is disabled
- Proper status checking for completion
The error message on line 622 could be more informative by including the actual error from the status if available (
resp.Status.Error).♻️ Minor improvement for error message
if !status.IsActive() { if status.IsSuccess() { logger.Info(ctx, "DAG completed successfully", tag.RunID(dagRunID)) return nil } - return fmt.Errorf("DAG run failed with status: %s", status) + errMsg := resp.Status.Error + if errMsg == "" { + errMsg = "no error details available" + } + return fmt.Errorf("DAG run failed with status %s: %s", status, errMsg) }internal/service/coordinator/handler.go (2)
448-544: Log-path rewrite: handle empty step names (esp. handler nodes) to avoid “.stdout.log”.
transformLogPaths()usesnode.Step.Namefor filename generation; if that’s empty (common for synthetic/handler nodes depending on how they’re populated), you’ll end up with odd filenames like.stdout.log. Consider providing explicit names for handler nodes (onInit/onExit/…) or falling back whenstepName == "".
52-60: Open-attempt + per-run mutex maps can grow unbounded in long-lived coordinators.You’re intentionally keeping attempts open until coordinator shutdown; combined with
runMutexesnever being removed, this can become a slow FD/memory leak under many runs. Ifexecution.DAGRunAttempt.Open()holds OS resources, this becomes a production reliability issue. Consider evicting/closing on terminal statuses (with “already-closed” tolerance) and deleting the per-run mutex entry once a run is terminal.Also applies to: 109-123, 451-477
internal/runtime/agent/agent.go (1)
649-657: Avoid concrete*remote.LogStreamertype-assertion + guard against blocking shutdown.This couples the agent to one concrete implementation and may silently skip scheduler-log streaming for wrappers/mocks. Also, streaming at the end can delay completion indefinitely if the coordinator is slow/unreachable. Consider asserting on a small interface and applying a timeout (optionally with
context.WithoutCancelso it still attempts to flush on cancellation).Proposed refactor
- // Stream scheduler log to coordinator if using remote logging (shared-nothing mode) - if a.logWriterFactory != nil { - if streamer, ok := a.logWriterFactory.(*remote.LogStreamer); ok { - if err := streamer.StreamSchedulerLog(ctx, a.logFile); err != nil { - logger.Warn(ctx, "Failed to stream scheduler log", tag.Error(err)) - } - } - } + // Stream scheduler log to coordinator if supported by the configured log writer factory. + type schedulerLogStreamer interface { + StreamSchedulerLog(context.Context, string) error + } + if streamer, ok := a.logWriterFactory.(schedulerLogStreamer); ok { + streamCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + if err := streamer.StreamSchedulerLog(streamCtx, a.logFile); err != nil { + logger.Warn(ctx, "Failed to stream scheduler log", tag.Error(err)) + } + }internal/cmd/progress_remote.go (2)
167-179: Non-TTY mode never prints a “finished” line.For CI logs / redirected stderr,
printHeader()prints “Started: …” once, butprintFinal()returns early when!isTTY, so you don’t get an explicit completion line. Consider printing a final one-liner in non-TTY too.Also applies to: 201-222
182-199: Cap progress percent (and handle completed > total) to avoid >100%.If the coordinator status ever includes extra nodes (or
totalis underestimated),percentcan exceed 100 and the(completed/total)can look wrong. Consider clampingcompletedShown := min(p.completed, p.total)andpercent := min(percent, 100).Also applies to: 201-222
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
proto/coordinator/v1/coordinator.pb.gois excluded by!**/*.pb.goproto/coordinator/v1/coordinator_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (42)
internal/cmd/context.gointernal/cmd/progress_remote.gointernal/cmd/restart.gointernal/cmd/retry.gointernal/cmd/server.gointernal/cmd/start.gointernal/cmd/startall.gointernal/cmd/worker.gointernal/common/stringutil/spinner.gointernal/common/telemetry/collector_test.gointernal/core/execution/context.gointernal/core/execution/dagrun.gointernal/core/execution/noop_attempt.gointernal/core/status.gointernal/integration/queue_shell_test.gointernal/persistence/filedagrun/store.gointernal/runtime/agent/agent.gointernal/runtime/agent/dbclient_test.gointernal/runtime/agent/progress_simple.gointernal/runtime/executor/dag_runner.gointernal/runtime/manager.gointernal/runtime/remote/log_streamer.gointernal/runtime/remote/status_pusher_test.gointernal/service/coordinator/client.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/coordinator/log_handler.gointernal/service/coordinator/log_handler_test.gointernal/service/frontend/api/v2/api.gointernal/service/frontend/api/v2/dagruns.gointernal/service/frontend/auth/oidc.gointernal/service/frontend/auth/oidc_test.gointernal/service/frontend/server.gointernal/service/scheduler/zombie_detector.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/poller_test.gointernal/service/worker/remote_handler.gointernal/service/worker/remote_handler_test.gointernal/test/server.goproto/coordinator/v1/coordinator.protoui/src/features/dags/components/dag-execution/ExecutionLog.tsxui/src/features/dags/components/dag-execution/StepLog.tsx
🧰 Additional context used
📓 Path-based instructions (4)
ui/**/*.{ts,tsx,jsx,js}
📄 CodeRabbit inference engine (ui/CLAUDE.md)
ui/**/*.{ts,tsx,jsx,js}: Use developer-centric UI design with high information density, minimal whitespace, compact components, and no unnecessary decorations
Support both light and dark modes for all UI components using Tailwind CSS class pairs likedark:bg-slate-700
NEVER use full-page loading overlays or LoadingIndicator components that hide content - show stale data while fetching updates instead
Use compact modal design with small headers, minimal padding (p-2 or p-3), tight spacing, and support keyboard navigation (arrows, enter, escape)
Use small heights for form elements: select boxes h-7 or smaller, buttons h-7 or h-8, inputs with compact padding (py-0.5 or py-1)
Minimize row heights in tables and lists while maintaining readability, merge related columns, and always handle long text withwhitespace-normal break-words
Use consistent metadata styling withbg-slate-200 dark:bg-slate-700backgrounds and maintain text hierarchy with primary/secondary/muted text colors
Use flexbox-first layouts withmin-h-0andoverflow-hiddento prevent layout breaks, account for fixed elements when setting heights
Maintain keyboard navigation support in all interactive components with appropriate focus indicators and ARIA labels
Files:
ui/src/features/dags/components/dag-execution/ExecutionLog.tsxui/src/features/dags/components/dag-execution/StepLog.tsx
ui/**/*.{ts,tsx}
📄 CodeRabbit inference engine (AGENTS.md)
ui/**/*.{ts,tsx}: The React + TypeScript frontend resides inui/, with production bundles copied tointernal/service/frontend/assetsbymake ui
UI code follows ESLint + Prettier (2-space indent) and Tailwind utilities; name React components in PascalCase (JobList.tsx) and hooks withuse*(useJobs.ts)
Files:
ui/src/features/dags/components/dag-execution/ExecutionLog.tsxui/src/features/dags/components/dag-execution/StepLog.tsx
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/common/stringutil/spinner.gointernal/core/execution/context.gointernal/service/coordinator/log_handler_test.gointernal/common/telemetry/collector_test.gointernal/cmd/worker.gointernal/service/frontend/auth/oidc_test.gointernal/runtime/remote/status_pusher_test.gointernal/core/execution/dagrun.gointernal/runtime/executor/dag_runner.gointernal/cmd/context.gointernal/runtime/agent/progress_simple.gointernal/cmd/start.gointernal/cmd/retry.gointernal/service/worker/remote_handler.gointernal/persistence/filedagrun/store.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/poller_test.gointernal/core/status.gointernal/service/coordinator/client.gointernal/service/coordinator/log_handler.gointernal/cmd/progress_remote.gointernal/service/frontend/server.gointernal/service/coordinator/handler.gointernal/service/scheduler/zombie_detector.gointernal/core/execution/noop_attempt.gointernal/test/server.gointernal/cmd/startall.gointernal/runtime/remote/log_streamer.gointernal/runtime/manager.gointernal/service/frontend/auth/oidc.gointernal/integration/queue_shell_test.gointernal/service/worker/remote_handler_test.gointernal/runtime/agent/agent.gointernal/runtime/agent/dbclient_test.gointernal/service/frontend/api/v2/api.gointernal/cmd/server.gointernal/service/frontend/api/v2/dagruns.gointernal/cmd/restart.gointernal/service/coordinator/handler_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/service/coordinator/log_handler_test.gointernal/common/telemetry/collector_test.gointernal/service/frontend/auth/oidc_test.gointernal/runtime/remote/status_pusher_test.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/poller_test.gointernal/integration/queue_shell_test.gointernal/service/worker/remote_handler_test.gointernal/runtime/agent/dbclient_test.gointernal/service/coordinator/handler_test.go
🧠 Learnings (2)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/cmd/start.gointernal/cmd/retry.gointernal/cmd/startall.go
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: `make run` starts the Go scheduler and serves the compiled UI (fails fast if `ui/dist` is missing)
Applied to files:
internal/integration/queue_shell_test.go
🧬 Code graph analysis (32)
internal/core/execution/context.go (1)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)
internal/service/coordinator/log_handler_test.go (1)
internal/service/coordinator/log_handler.go (1)
StreamTypeToExtension(244-256)
internal/common/telemetry/collector_test.go (2)
internal/cmd/context.go (1)
Context(44-59)internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)
internal/service/frontend/auth/oidc_test.go (2)
internal/service/frontend/auth/oidc.go (1)
InitVerifierAndConfig(137-160)internal/common/config/config.go (1)
AuthOIDC(242-256)
internal/runtime/remote/status_pusher_test.go (3)
internal/cmd/context.go (1)
Context(44-59)internal/core/execution/context.go (1)
Context(17-29)internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)
internal/core/execution/dagrun.go (2)
internal/core/execution/context.go (1)
Context(17-29)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)
internal/runtime/agent/progress_simple.go (2)
internal/core/status.go (1)
Status(4-4)internal/common/stringutil/spinner.go (1)
SpinnerFrames(5-5)
internal/cmd/start.go (4)
internal/core/dag.go (1)
DAG(63-178)internal/service/coordinator/client.go (1)
Client(29-59)internal/cmd/progress_remote.go (2)
RemoteProgressDisplay(21-36)NewRemoteProgressDisplay(39-54)internal/runtime/executor/task.go (1)
CreateTask(12-33)
internal/cmd/retry.go (5)
internal/core/dag.go (1)
DAG(63-178)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/service/coordinator/client.go (1)
Client(29-59)internal/runtime/executor/task.go (4)
TaskOption(36-36)WithWorkerSelector(68-72)WithPreviousStatus(83-89)WithStep(75-79)proto/coordinator/v1/coordinator.pb.go (1)
Operation_OPERATION_RETRY(29-29)
internal/service/worker/remote_handler.go (4)
internal/common/logger/context.go (3)
Errorf(75-77)Warn(45-47)WithLogger(12-14)internal/common/logger/tag/tag.go (1)
Target(168-170)internal/persistence/filedagrun/writer.go (1)
Writer(35-42)internal/common/logger/logger.go (2)
NewLogger(93-133)WithWriter(69-73)
internal/persistence/filedagrun/store.go (3)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/persistence/filedagrun/dataroot.go (1)
NewDataRoot(50-78)
internal/service/scheduler/zombie_detector_test.go (2)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)
internal/service/worker/poller_test.go (3)
internal/core/execution/context.go (1)
Context(17-29)internal/runtime/context.go (1)
Context(14-14)internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)
internal/service/coordinator/client.go (2)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)proto/coordinator/v1/coordinator.pb.go (3)
RequestCancelRequest(1963-1972)RequestCancelRequest(1985-1985)RequestCancelRequest(2000-2002)
internal/service/coordinator/log_handler.go (2)
proto/coordinator/v1/coordinator.pb.go (7)
LogStreamType_LOG_STREAM_TYPE_SCHEDULER(133-133)LogStreamType(127-127)LogStreamType(162-164)LogStreamType(166-168)LogStreamType(175-177)LogStreamType_LOG_STREAM_TYPE_STDOUT(131-131)LogStreamType_LOG_STREAM_TYPE_STDERR(132-132)api/v2/api.gen.go (1)
StepName(1410-1410)
internal/cmd/progress_remote.go (6)
internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)proto/coordinator/v1/coordinator.pb.go (3)
DAGRunStatusProto(1071-1099)DAGRunStatusProto(1112-1112)DAGRunStatusProto(1127-1129)internal/proto/convert/status.go (1)
ProtoToDAGRunStatus(114-153)internal/core/status.go (3)
Status(4-4)Aborted(10-10)Failed(9-9)internal/output/tree.go (1)
NewRenderer(84-86)internal/common/stringutil/spinner.go (1)
SpinnerFrames(5-5)
internal/service/frontend/server.go (1)
internal/service/frontend/auth/oidc.go (1)
InitVerifierAndConfig(137-160)
internal/service/coordinator/handler.go (4)
proto/coordinator/v1/coordinator.pb.go (19)
DispatchRequest(286-291)DispatchRequest(304-304)DispatchRequest(319-321)DispatchResponse(331-335)DispatchResponse(348-348)DispatchResponse(363-365)Task(368-387)Task(400-400)Task(415-417)Operation(24-24)Operation(56-58)Operation(60-62)Operation(69-71)RequestCancelRequest(1963-1972)RequestCancelRequest(1985-1985)RequestCancelRequest(2000-2002)RequestCancelResponse(2033-2039)RequestCancelResponse(2052-2052)RequestCancelResponse(2067-2069)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)internal/service/coordinator/log_handler.go (1)
StreamTypeToExtension(244-256)
internal/service/scheduler/zombie_detector.go (3)
internal/common/logger/context.go (3)
Errorf(75-77)Error(50-52)Debug(35-37)internal/common/logger/tag/tag.go (3)
Error(20-22)WorkerID(65-67)Queue(126-128)internal/persistence/fileproc/procgrp.go (1)
ProcGroup(20-27)
internal/core/execution/noop_attempt.go (2)
internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)
internal/test/server.go (2)
internal/service/frontend/server.go (1)
NewServer(72-176)internal/cmd/context.go (1)
Context(44-59)
internal/cmd/startall.go (2)
internal/cmd/context.go (1)
Context(44-59)internal/core/execution/context.go (2)
Context(17-29)WithContext(286-288)
internal/runtime/remote/log_streamer.go (2)
proto/coordinator/v1/coordinator.pb.go (4)
LogChunk(1643-1660)LogChunk(1673-1673)LogChunk(1688-1690)LogStreamType_LOG_STREAM_TYPE_SCHEDULER(133-133)proto/coordinator/v1/coordinator_grpc.pb.go (1)
CoordinatorService_StreamLogsClient(130-130)
internal/runtime/manager.go (1)
internal/common/logger/tag/tag.go (1)
WorkerID(65-67)
internal/service/frontend/auth/oidc.go (2)
internal/common/backoff/retry.go (1)
Retry(31-83)internal/common/config/config.go (1)
AuthOIDC(242-256)
internal/integration/queue_shell_test.go (1)
internal/cmd/context.go (1)
Context(44-59)
internal/service/worker/remote_handler_test.go (1)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)
internal/runtime/agent/agent.go (3)
internal/runtime/remote/log_streamer.go (1)
LogStreamer(27-35)internal/core/dag.go (1)
HandlerOn(551-558)internal/core/execution/noop_attempt.go (1)
NewNoopDAGRunAttempt(20-22)
internal/runtime/agent/dbclient_test.go (2)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)
internal/service/frontend/api/v2/api.go (1)
internal/service/frontend/auth/oidc.go (1)
InitVerifierAndConfig(137-160)
internal/cmd/server.go (3)
internal/cmd/context.go (1)
Context(44-59)internal/common/config/config.go (2)
Config(10-43)Server(88-135)internal/runtime/runner.go (1)
Config(96-110)
internal/service/coordinator/handler_test.go (3)
internal/core/execution/dagrun.go (1)
DAGRunRef(132-135)internal/core/execution/dagrun_attempt.go (1)
DAGRunAttempt(19-52)internal/core/execution/runstatus.go (1)
DAGRunStatus(36-61)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Build
- GitHub Check: Test on ubuntu-latest
55a61e4 to
c1b5c9a
Compare
231a6c9 to
bff6cf5
Compare
3b8752e to
6002dc4
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1573 +/- ##
==========================================
- Coverage 66.64% 64.79% -1.85%
==========================================
Files 250 255 +5
Lines 27678 28380 +702
==========================================
- Hits 18445 18389 -56
- Misses 7600 8343 +743
- Partials 1633 1648 +15
... and 13 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary by CodeRabbit
Release Notes
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.