fix: repair stale distributed sub-dag leases#2143
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThe PR introduces a new ChangesAttempt-scoped Compare-And-Swap API
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes The PR changes a core store API across multiple components (storage, handlers, tests) with moderate complexity. The storage implementation adds sub-DAG lookup logic and dual-field identity validation. All test mocks must be updated consistently to match the new signature. No novel algorithms or tricky control flow, but broad surface area and several interdependent layers require careful verification of signature updates, delegations, and test coverage. Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
internal/service/coordinator/handler.go (1)
2787-2793: 💤 Low valueAdd parentheses for clarity around mixed
||/&&precedence.The condition relies on Go's
&&-binds-tighter-than-||precedence rule. It currently evaluates asattemptID != ref.AttemptID || (!IsActive() && != NotStarted), which is the intended semantics, but at a glance it's easy to misread as(attemptID != ref.AttemptID || !IsActive()) && != NotStarted. Explicit parentheses would make the intent obvious and resilient to future edits.♻️ Proposed clarification
- if status.AttemptID != ref.AttemptID || !status.Status.IsActive() && status.Status != core.NotStarted { + if status.AttemptID != ref.AttemptID || (!status.Status.IsActive() && status.Status != core.NotStarted) { h.deleteDistributedTracking(ctx, storeCtx, ref.DAGRun, ref.AttemptKey, "Failed to delete superseded distributed lease", "Failed to delete superseded active distributed run", ) return }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/service/coordinator/handler.go` around lines 2787 - 2793, The boolean expression using status.AttemptID, ref.AttemptID, status.Status.IsActive(), and core.NotStarted is correct but ambiguous; update the if condition to add explicit parentheses so the intent (status.AttemptID != ref.AttemptID || (!status.Status.IsActive() && status.Status != core.NotStarted)) is clear and not misread, then keep the existing h.deleteDistributedTracking(...) call and return unchanged.internal/service/coordinator/handler_test.go (1)
1917-1977: ⚡ Quick winConsider asserting lease cleanup to mirror the root-level regression test.
The analogous root-level test
DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt(lines 2109–2168) additionally asserts that the stale lease is deleted (leaseStore.Get(...)→exec.ErrDAGRunLeaseNotFound). Since this PR's stated objective is to repair sub-DAG lease identity in the failure path, asserting lease cleanup here would tighten the regression coverage and catch a future regression where the sub-DAG path fails the attempt but leaks the lease.♻️ Suggested addition
require.Equal(t, staleDistributedLeaseReason("worker-1"), status.Error) require.Equal(t, core.NodeFailed, status.Nodes[0].Status) require.Equal(t, staleDistributedLeaseReason("worker-1"), status.Nodes[0].Error) + + _, err = leaseStore.Get(ctx, attemptKey) + assert.ErrorIs(t, err, exec.ErrDAGRunLeaseNotFound) })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/service/coordinator/handler_test.go` around lines 1917 - 1977, The test DetectStaleLeasesFailsSubDAGLeasedRunWhenFreshWorkerHeartbeatDropsAttempt should also assert the stale lease was removed: after calling h.detectStaleLeases(ctx) and verifying the attempt status, call leaseStore.Get(ctx, attemptKey) (or the store's equivalent lookup used elsewhere) and require it returns exec.ErrDAGRunLeaseNotFound to ensure the sub-DAG lease cleanup mirrors the root-level test DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt and prevents lease leaks for the sub-run path.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/core/exec/dagrun.go`:
- Around line 301-304: The IsSubDAG method can misclassify when Root.ID equals
DAGRun.ID but the DAG names differ; update DAGRunAttemptRef.IsSubDAG to treat
that case as a sub-DAG by also comparing the root DAG name to the current DAG
run name (i.e., return true when Root is non-zero and either Root.ID !=
DAGRun.ID or Root.DAGName != DAGRun.Name); use the struct fields referenced in
this function (DAGRunAttemptRef, Root, Root.Zero(), Root.ID, Root.DAGName,
r.DAGRun.ID, r.DAGRun.Name) to implement the additional DAG-name comparison.
---
Nitpick comments:
In `@internal/service/coordinator/handler_test.go`:
- Around line 1917-1977: The test
DetectStaleLeasesFailsSubDAGLeasedRunWhenFreshWorkerHeartbeatDropsAttempt should
also assert the stale lease was removed: after calling h.detectStaleLeases(ctx)
and verifying the attempt status, call leaseStore.Get(ctx, attemptKey) (or the
store's equivalent lookup used elsewhere) and require it returns
exec.ErrDAGRunLeaseNotFound to ensure the sub-DAG lease cleanup mirrors the
root-level test
DetectStaleLeasesFailsLeasedRunWhenFreshWorkerHeartbeatDropsAttempt and prevents
lease leaks for the sub-run path.
In `@internal/service/coordinator/handler.go`:
- Around line 2787-2793: The boolean expression using status.AttemptID,
ref.AttemptID, status.Status.IsActive(), and core.NotStarted is correct but
ambiguous; update the if condition to add explicit parentheses so the intent
(status.AttemptID != ref.AttemptID || (!status.Status.IsActive() &&
status.Status != core.NotStarted)) is clear and not misread, then keep the
existing h.deleteDistributedTracking(...) call and return unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1e2960db-1d42-4004-b49d-64e2074b587e
📒 Files selected for processing (15)
internal/agent/dag_manage_test.gointernal/cmd/enqueue_internal_test.gointernal/cmn/telemetry/collector_test.gointernal/core/exec/dagrun.gointernal/core/exec/enqueue_retry_test.gointernal/persis/filedagrun/store.gointernal/persis/filedagrun/store_test.gointernal/runtime/agent/dbclient_test.gointernal/runtime/distributed_stale_run.gointernal/service/coordinator/handler.gointernal/service/coordinator/handler_test.gointernal/service/frontend/api/v1/dagruns_internal_test.gointernal/service/scheduler/retry_scanner_test.gointernal/service/scheduler/zombie_detector_test.gointernal/service/worker/remote_handler_test.go
| // IsSubDAG returns true when the attempt is stored under a different root run. | ||
| func (r DAGRunAttemptRef) IsSubDAG() bool { | ||
| return !r.Root.Zero() && r.Root.ID != "" && r.Root.ID != r.DAGRun.ID | ||
| } |
There was a problem hiding this comment.
IsSubDAG can misclassify sub-runs when root and child share the same run ID.
Line 303 only compares Root.ID vs DAGRun.ID. If IDs match but DAG names differ, this returns false and can route sub-DAG attempts through root-run paths.
Proposed fix
func (r DAGRunAttemptRef) IsSubDAG() bool {
- return !r.Root.Zero() && r.Root.ID != "" && r.Root.ID != r.DAGRun.ID
+ if r.Root.Zero() || r.Root.ID == "" {
+ return false
+ }
+ return r.Root.ID != r.DAGRun.ID || r.Root.Name != r.DAGRun.Name
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/core/exec/dagrun.go` around lines 301 - 304, The IsSubDAG method can
misclassify when Root.ID equals DAGRun.ID but the DAG names differ; update
DAGRunAttemptRef.IsSubDAG to treat that case as a sub-DAG by also comparing the
root DAG name to the current DAG run name (i.e., return true when Root is
non-zero and either Root.ID != DAGRun.ID or Root.DAGName != DAGRun.Name); use
the struct fields referenced in this function (DAGRunAttemptRef, Root,
Root.Zero(), Root.ID, Root.DAGName, r.DAGRun.ID, r.DAGRun.Name) to implement the
additional DAG-name comparison.
Summary
Fixes #2136
Testing
Summary by CodeRabbit