Skip to content

fix: repair stale distributed sub-dag leases#2143

Merged
yohamta0 merged 4 commits into
mainfrom
codex/fix-distributed-subdag-lease-repair
May 12, 2026
Merged

fix: repair stale distributed sub-dag leases#2143
yohamta0 merged 4 commits into
mainfrom
codex/fix-distributed-subdag-lease-repair

Conversation

@yohamta0

@yohamta0 yohamta0 commented May 12, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • add a hierarchy-aware DAG run attempt reference for root and sub-DAG status mutation
  • update stale distributed lease repair to preserve root/sub-DAG identity when failing stale attempts
  • add regression coverage for stale distributed sub-DAG leases and file-store sub-attempt CAS

Fixes #2136

Testing

  • go test ./internal/service/coordinator ./internal/persis/filedagrun ./internal/runtime ./internal/service/frontend/api/v1
  • go test ./internal/core/exec ./internal/service/scheduler ./internal/service/worker
  • go test ./internal/... -run '^$'
  • go test ./internal/intg/distr -run 'TestSubDAG|TestCancellation/cancelPropagatesToSubDAGOnWorker'
  • git diff --check

Summary by CodeRabbit

  • Bug Fixes
    • Improved atomic status operations for DAG run attempts to enhance concurrency safety during status transitions and execution reliability across distributed systems
    • Enhanced sub-DAG attempt identification and lifecycle tracking for more accurate state management and recovery
    • Strengthened stale run detection mechanisms with more precise attempt-level comparison logic for robust error handling

Review Change Stack

@coderabbitai

coderabbitai Bot commented May 12, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 39e81893-69e6-48a3-9549-af88e3be297c

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR introduces a new DAGRunAttemptRef type and corresponding CompareAndSwapAttemptStatus interface method to enable precise, attempt-keyed atomic status updates on DAG runs, replacing the prior "latest attempt" semantics. This supports both root and sub-DAG attempts by including identity, root, and key fields. Storage, usage sites, and all test mocks are updated consistently.

Changes

Attempt-scoped Compare-And-Swap API

Layer / File(s) Summary
Core API type and interface definition
internal/core/exec/dagrun.go
New exported DAGRunAttemptRef struct stores DAGRun, Root, AttemptID, and AttemptKey; helper methods IsSubDAG() and RootOrDAGRun() support root/sub-DAG routing. New CompareAndSwapAttemptStatus method added to DAGRunStore interface, accepting an attempt ref and expected status with a mutation callback.
File storage layer for attempt-scoped CAS
internal/persis/filedagrun/store.go, internal/persis/filedagrun/store_test.go
CompareAndSwapAttemptStatus implementation resolves root and sub-DAG runs via RootOrDAGRun() and FindSubDAGRun(), validates attempt identity against both AttemptID and AttemptKey, and applies the mutation. Legacy CompareAndSwapLatestAttemptStatus now delegates to the new method. New test CompareAndSwapAttemptStatusUpdatesSubAttempt verifies sub-attempt CAS transitions with node status updates.
Stale distributed-run repair using attempt-scoped CAS
internal/runtime/distributed_stale_run.go, internal/service/coordinator/handler.go, internal/service/coordinator/handler_test.go
ConfirmAndRepairStaleDistributedRun and repairStaleLeaseFailureFromRunHeartbeat switch to CompareAndSwapAttemptStatus with fully-populated DAGRunAttemptRef (DAGRun, Root, AttemptID, AttemptKey). Internal helper failDistributedAttemptIfCurrent refactored to accept DAGRunAttemptRef instead of separate identifier parameters, with updated CAS loop and post-swap cleanup using the ref. Mock implementation updated to route by IsSubDAG() and validate identifiers. New test DetectStaleLeasesFailsSubDAGLeasedRunWhenFreshWorkerHeartbeatDropsAttempt exercises sub-DAG lease stale-handling.
Test mock implementations of attempt-scoped CAS
internal/agent/dag_manage_test.go, internal/cmd/enqueue_internal_test.go, internal/cmn/telemetry/collector_test.go, internal/core/exec/enqueue_retry_test.go, internal/runtime/agent/dbclient_test.go, internal/service/frontend/api/v1/dagruns_internal_test.go, internal/service/scheduler/retry_scanner_test.go, internal/service/scheduler/zombie_detector_test.go, internal/service/worker/remote_handler_test.go
All test fixture stores updated to implement CompareAndSwapAttemptStatus(ctx, DAGRunAttemptRef, expectedStatus, mutate) instead of the legacy "latest attempt" signature. Implementations range from panic stubs, to delegating wrappers (e.g., mapping ref fields to legacy methods), to full mock call tracking with sub-DAG routing.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

The PR changes a core store API across multiple components (storage, handlers, tests) with moderate complexity. The storage implementation adds sub-DAG lookup logic and dual-field identity validation. All test mocks must be updated consistently to match the new signature. No novel algorithms or tricky control flow, but broad surface area and several interdependent layers require careful verification of signature updates, delegations, and test coverage.

Possibly related PRs

  • dagucloud/dagu#2041: Updates stale distributed-run repair flow (runtime/ConfirmAndRepairStaleDistributedRun) and the DAGRunStore CAS API directly; the retrieved PR implements worker-heartbeat–backed repair while this PR refactors the API.
  • dagucloud/dagu#2113: Adds DAG run management tooling and test updates in internal/agent/dag_manage_test.go that complement this PR's test mock updates across the codebase.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: repair stale distributed sub-dag leases' directly and specifically describes the main objective of the changeset—adding support for sub-DAG hierarchy awareness in lease repair.
Description check ✅ Passed The PR description provides a clear summary, lists key changes, references a linked issue (#2136), and documents testing methodology, mostly aligning with the repository template.
Linked Issues check ✅ Passed The PR introduces DAGRunAttemptRef for hierarchy-aware sub-DAG status mutations and updates distributed lease repair to preserve root/sub-DAG identity, directly addressing issue #2136's core objectives of fixing sub-DAG lease repair and preventing 'dag-run ID not found' errors.
Out of Scope Changes check ✅ Passed All changes are scoped to implementing CompareAndSwapAttemptStatus method across test/store implementations and refactoring distributed lease repair to use the new hierarchy-aware attempt reference, all directly supporting the stated objectives.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch codex/fix-distributed-subdag-lease-repair

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal/service/coordinator/handler.go (1)

2787-2793: 💤 Low value

Add parentheses for clarity around mixed ||/&& precedence.

The condition relies on Go's &&-binds-tighter-than-|| precedence rule. It currently evaluates as attemptID != ref.AttemptID || (!IsActive() && != NotStarted), which is the intended semantics, but at a glance it's easy to misread as (attemptID != ref.AttemptID || !IsActive()) && != NotStarted. Explicit parentheses would make the intent obvious and resilient to future edits.

♻️ Proposed clarification
-	if status.AttemptID != ref.AttemptID || !status.Status.IsActive() && status.Status != core.NotStarted {
+	if status.AttemptID != ref.AttemptID || (!status.Status.IsActive() && status.Status != core.NotStarted) {
 		h.deleteDistributedTracking(ctx, storeCtx, ref.DAGRun, ref.AttemptKey,
 			"Failed to delete superseded distributed lease",
 			"Failed to delete superseded active distributed run",
 		)
 		return
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/handler.go` around lines 2787 - 2793, The
boolean expression using status.AttemptID, ref.AttemptID,
status.Status.IsActive(), and core.NotStarted is correct but ambiguous; update
the if condition to add explicit parentheses so the intent (status.AttemptID !=
ref.AttemptID || (!status.Status.IsActive() && status.Status !=
core.NotStarted)) is clear and not misread, then keep the existing
h.deleteDistributedTracking(...) call and return unchanged.
internal/service/coordinator/handler_test.go (1)

1917-1977: ⚡ Quick win

Consider asserting lease cleanup to mirror the root-level regression test.

The analogous root-level test DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt (lines 2109–2168) additionally asserts that the stale lease is deleted (leaseStore.Get(...)exec.ErrDAGRunLeaseNotFound). Since this PR's stated objective is to repair sub-DAG lease identity in the failure path, asserting lease cleanup here would tighten the regression coverage and catch a future regression where the sub-DAG path fails the attempt but leaks the lease.

♻️ Suggested addition
 		require.Equal(t, staleDistributedLeaseReason("worker-1"), status.Error)
 		require.Equal(t, core.NodeFailed, status.Nodes[0].Status)
 		require.Equal(t, staleDistributedLeaseReason("worker-1"), status.Nodes[0].Error)
+
+		_, err = leaseStore.Get(ctx, attemptKey)
+		assert.ErrorIs(t, err, exec.ErrDAGRunLeaseNotFound)
 	})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/handler_test.go` around lines 1917 - 1977, The
test DetectStaleLeasesFailsSubDAGLeasedRunWhenFreshWorkerHeartbeatDropsAttempt
should also assert the stale lease was removed: after calling
h.detectStaleLeases(ctx) and verifying the attempt status, call
leaseStore.Get(ctx, attemptKey) (or the store's equivalent lookup used
elsewhere) and require it returns exec.ErrDAGRunLeaseNotFound to ensure the
sub-DAG lease cleanup mirrors the root-level test
DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt and prevents
lease leaks for the sub-run path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/core/exec/dagrun.go`:
- Around line 301-304: The IsSubDAG method can misclassify when Root.ID equals
DAGRun.ID but the DAG names differ; update DAGRunAttemptRef.IsSubDAG to treat
that case as a sub-DAG by also comparing the root DAG name to the current DAG
run name (i.e., return true when Root is non-zero and either Root.ID !=
DAGRun.ID or Root.DAGName != DAGRun.Name); use the struct fields referenced in
this function (DAGRunAttemptRef, Root, Root.Zero(), Root.ID, Root.DAGName,
r.DAGRun.ID, r.DAGRun.Name) to implement the additional DAG-name comparison.

---

Nitpick comments:
In `@internal/service/coordinator/handler_test.go`:
- Around line 1917-1977: The test
DetectStaleLeasesFailsSubDAGLeasedRunWhenFreshWorkerHeartbeatDropsAttempt should
also assert the stale lease was removed: after calling h.detectStaleLeases(ctx)
and verifying the attempt status, call leaseStore.Get(ctx, attemptKey) (or the
store's equivalent lookup used elsewhere) and require it returns
exec.ErrDAGRunLeaseNotFound to ensure the sub-DAG lease cleanup mirrors the
root-level test
DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt and prevents
lease leaks for the sub-run path.

In `@internal/service/coordinator/handler.go`:
- Around line 2787-2793: The boolean expression using status.AttemptID,
ref.AttemptID, status.Status.IsActive(), and core.NotStarted is correct but
ambiguous; update the if condition to add explicit parentheses so the intent
(status.AttemptID != ref.AttemptID || (!status.Status.IsActive() &&
status.Status != core.NotStarted)) is clear and not misread, then keep the
existing h.deleteDistributedTracking(...) call and return unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1e2960db-1d42-4004-b49d-64e2074b587e

📥 Commits

Reviewing files that changed from the base of the PR and between dfc5cb5 and 86629d3.

📒 Files selected for processing (15)
  • internal/agent/dag_manage_test.go
  • internal/cmd/enqueue_internal_test.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/exec/dagrun.go
  • internal/core/exec/enqueue_retry_test.go
  • internal/persis/filedagrun/store.go
  • internal/persis/filedagrun/store_test.go
  • internal/runtime/agent/dbclient_test.go
  • internal/runtime/distributed_stale_run.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/frontend/api/v1/dagruns_internal_test.go
  • internal/service/scheduler/retry_scanner_test.go
  • internal/service/scheduler/zombie_detector_test.go
  • internal/service/worker/remote_handler_test.go

Comment thread internal/core/exec/dagrun.go Outdated
Comment on lines +301 to +304
// IsSubDAG returns true when the attempt is stored under a different root run.
func (r DAGRunAttemptRef) IsSubDAG() bool {
return !r.Root.Zero() && r.Root.ID != "" && r.Root.ID != r.DAGRun.ID
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

IsSubDAG can misclassify sub-runs when root and child share the same run ID.

Line 303 only compares Root.ID vs DAGRun.ID. If IDs match but DAG names differ, this returns false and can route sub-DAG attempts through root-run paths.

Proposed fix
 func (r DAGRunAttemptRef) IsSubDAG() bool {
-	return !r.Root.Zero() && r.Root.ID != "" && r.Root.ID != r.DAGRun.ID
+	if r.Root.Zero() || r.Root.ID == "" {
+		return false
+	}
+	return r.Root.ID != r.DAGRun.ID || r.Root.Name != r.DAGRun.Name
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/core/exec/dagrun.go` around lines 301 - 304, The IsSubDAG method can
misclassify when Root.ID equals DAGRun.ID but the DAG names differ; update
DAGRunAttemptRef.IsSubDAG to treat that case as a sub-DAG by also comparing the
root DAG name to the current DAG run name (i.e., return true when Root is
non-zero and either Root.ID != DAGRun.ID or Root.DAGName != DAGRun.Name); use
the struct fields referenced in this function (DAGRunAttemptRef, Root,
Root.Zero(), Root.ID, Root.DAGName, r.DAGRun.ID, r.DAGRun.Name) to implement the
additional DAG-name comparison.

@yohamta0 yohamta0 merged commit c49ccd3 into main May 12, 2026
10 checks passed
@yohamta0 yohamta0 deleted the codex/fix-distributed-subdag-lease-repair branch May 12, 2026 15:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: distributed run lease expired: worker ... accepted the task claim but stopped reporting to the owner coordinator

1 participant