Add agent DAG management tools and run watches#2113
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughAdds dag_def_manage and dag_run_manage agent tools, an in-memory session-local DAG run watch registry, context/ToolConfig/API/Session/runtime wiring for DAG stores/watchers, UI/prompt updates, and tests covering management and watch behaviors. ChangesDAG Management Agent Tools
Sequence Diagram(s)sequenceDiagram
actor User
participant Agent as Agent LLM
participant SessionMgr as Session Manager
participant Tool as dag_run_manage Tool
participant DAGRunStore
participant Logger
User->>Agent: Query DAG run status
Agent->>SessionMgr: Invoke dag_run_manage (action: "list")
SessionMgr->>Tool: Call Run() with JSON input
Tool->>DAGRunStore: List runs with filters
DAGRunStore-->>Tool: Return paginated runs
Tool->>Logger: Read step logs & messages
Logger-->>Tool: Return log content
Tool-->>SessionMgr: Return JSON response (runs, logs, cursor)
SessionMgr-->>Agent: Tool output
Agent-->>User: Formatted DAG status report
sequenceDiagram
actor User
participant Agent as Agent LLM
participant SessionMgr as Session Manager
participant Tool as dag_run_manage Tool
participant Registry as Watch Registry
participant DAGRunStore
participant Notifier
User->>Agent: Monitor DAG run (action: "watch")
Agent->>SessionMgr: Invoke dag_run_manage
SessionMgr->>Tool: Call Run() with watch request
Tool->>Registry: Watch(dagName, runId, notifyOn)
Registry->>DAGRunStore: Read current run status
DAGRunStore-->>Registry: Return status
Registry->>Registry: Start polling goroutine
Registry-->>Tool: Return watchId
Tool-->>Agent: {"watch": "started", "watchId": "..."}
loop Poll interval
Registry->>DAGRunStore: Check run status
alt Terminal state detected
Registry->>Registry: Mark completed
Registry->>Notifier: formatDAGRunWatchNotification()
Notifier->>SessionMgr: AppendExternalMessage()
SessionMgr-->>Agent: External notification
else Still running
Registry->>Registry: Update cached status
end
end
User->>Agent: Check watch status
Agent->>SessionMgr: Invoke dag_run_manage (action: "watch_status")
SessionMgr->>Registry: Status(watchId)
Registry-->>Agent: Return latest status & timestamps
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx (1)
1-5:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winMissing GPL v3 license header.
The file is missing the required GPL v3 license header. As per coding guidelines (
**/*.{go,ts,tsx,js}): apply GPL v3 license headers on source files, managed viamake addlicense.Run
make addlicenseto add the license header automatically.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx` around lines 1 - 5, The file ChatHistoryTab.tsx is missing the required GPL v3 license header; run the repository license tool (make addlicense) to insert the standard header into this source file, or manually add the GPL v3 header comment at the top of ChatHistoryTab.tsx so all exported identifiers (e.g., the ChatHistoryTab component, imports like StepMessagesTable, isActiveNodeStatus, and any top-level functions) are covered by the license; ensure the header matches the project's standard used for .ts/.tsx files.
🧹 Nitpick comments (1)
ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx (1)
45-54: ⚡ Quick win
useEffectresets user selection on everydefaultStepchange.The effect unconditionally overwrites
selectedStepwheneverdefaultStepchanges (e.g., any time a node finishes). A user who manually picked a step mid-run will have their selection silently replaced. Consider tracking whether the user has explicitly chosen a step and skipping the reset in that case.♻️ Suggested approach
+ const [userSelected, setUserSelected] = useState(false); const [selectedStep, setSelectedStep] = useState<string | undefined>(defaultStep); useEffect(() => { - if (defaultStep) { - setSelectedStep(defaultStep); - } + if (defaultStep && !userSelected) { + setSelectedStep(defaultStep); + } }, [defaultStep, userSelected]); // in the onChange handler: - onChange={(e) => setSelectedStep(e.target.value)} + onChange={(e) => { setUserSelected(true); setSelectedStep(e.target.value); }}You may also want to reset
userSelectedwhen thedagRun.dagRunIdchanges (new run opened).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx` around lines 45 - 54, The current useEffect unconditionally sets selectedStep from defaultStep on every change, which overwrites a user's manual choice; add a boolean state (e.g., userSelected) and set it to true when the user calls setSelectedStep manually, update the effect in ChatHistoryTab to only copy defaultStep into selectedStep when userSelected is false (or when the active run changes), and reset userSelected to false when dagRun.dagRunId (or whichever run identifier is used) changes so defaultStep can apply for a new run.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/agent/api.go`:
- Around line 191-193: The shared DAGRunWatchRegistry created in
api.dagRunWatcher via newDAGRunWatchRegistry(api.dagRunStore,
api.notifyDAGRunWatch, api.logger) lacks teardown handling so canceled/cleaned
sessions keep their watches; add explicit cleanup hooks to unregister watches
when a session ends (e.g., add a RemoveSessionWatches(sessionID) or
Clear(sessionID) method on DAGRunWatchRegistry and call it from the session
teardown/Cancel/Close path), ensure notifyDAGRunWatch or session lifecycle code
invokes that removal, and update DAGRunWatchRegistry to stop any background
pollers for that session to prevent stale polling against dagRunStore.
In `@internal/agent/dag_def_manage.go`:
- Around line 159-161: The loop over result.Items can panic because it
dereferences dag.Name without checking for nil; modify the for loop that builds
items to guard each dag: for each element in result.Items, first check if dag ==
nil and either skip it or construct a safe fallback entry, then call
summarizeDAGDefinition and store.IsSuspended only when dag is non-nil (using
dag.Name). Ensure the references to summarizeDAGDefinition and
store.IsSuspended(dag.Name) are moved inside the nil-check so nil items are not
dereferenced.
In `@internal/agent/dag_run_watch.go`:
- Around line 315-323: The polling loop currently calls readStatus(...) and
complete(...) with context.Background(), which prevents cancellation and can
leak poll goroutines; change both calls to use a cancellable/timeout context
derived from the watch's parent context (e.g., use the ctx variable for the
goroutine or create one with context.WithTimeout(parentCtx, timeout) and defer
cancel()) so store reads and completion respect cancellation/deadline; update
the calls to readStatus(ctx, req.DAGName, ...) and complete(ctx, watchID,
status) and ensure you cancel the derived context to avoid leaks.
---
Outside diff comments:
In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx`:
- Around line 1-5: The file ChatHistoryTab.tsx is missing the required GPL v3
license header; run the repository license tool (make addlicense) to insert the
standard header into this source file, or manually add the GPL v3 header comment
at the top of ChatHistoryTab.tsx so all exported identifiers (e.g., the
ChatHistoryTab component, imports like StepMessagesTable, isActiveNodeStatus,
and any top-level functions) are covered by the license; ensure the header
matches the project's standard used for .ts/.tsx files.
---
Nitpick comments:
In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx`:
- Around line 45-54: The current useEffect unconditionally sets selectedStep
from defaultStep on every change, which overwrites a user's manual choice; add a
boolean state (e.g., userSelected) and set it to true when the user calls
setSelectedStep manually, update the effect in ChatHistoryTab to only copy
defaultStep into selectedStep when userSelected is false (or when the active run
changes), and reset userSelected to false when dagRun.dagRunId (or whichever run
identifier is used) changes so defaultStep can apply for a new run.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a8621d50-51e2-44b4-a1fd-c6d597bda508
📒 Files selected for processing (15)
internal/agent/api.gointernal/agent/contextkeys.gointernal/agent/dag_def_manage.gointernal/agent/dag_manage_test.gointernal/agent/dag_run_manage.gointernal/agent/dag_run_watch.gointernal/agent/session.gointernal/agent/system_prompt.txtinternal/agent/tool_registry.gointernal/agent/tool_registry_test.gointernal/cmd/agent.gointernal/runtime/agent/agent.gointernal/runtime/builtin/agentstep/executor.gointernal/service/frontend/server.goui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
|
@coderabbitai review |
|
@coderabbitai full review |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/agent/dag_run_manage.go (1)
287-307: ⚡ Quick winAdd validation to reject conflicting
lastandfromparameters.When both
lastandfromare supplied, the handler appends twoexec.WithFrom()options where the second (fromargs.From) silently overrides the first (fromargs.Last). The codebase already validates this incmd/history.go(line 207), rejecting the combination with:cannot use --last with --from or --to (conflicting time range specifications). Follow the same pattern indagRunManageList()for consistency and to prevent LLM agents from experiencing silent override on semantically conflicting parameters.♻️ Suggested validation
+ if args.Last != "" && args.From != "" { + return toolError("specify either last or from, not both") + } if args.Last != "" { from, err := parseDAGManageRelativeTime(args.Last, time.Now()) if err != nil { return toolError("%v", err) } opts = append(opts, exec.WithFrom(exec.NewUTC(from))) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/agent/dag_run_manage.go` around lines 287 - 307, In dagRunManageList(), add a pre-validation that rejects conflicting time flags by returning toolError when args.Last is set together with args.From or args.To (e.g., if args.Last != "" && (args.From != "" || args.To != "")), mirroring cmd/history.go’s message "cannot use --last with --from or --to (conflicting time range specifications)"; perform this check before any parseDAGManageRelativeTime/parseDAGManageAbsoluteTime calls so you avoid appending multiple exec.WithFrom() options and silent overrides.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@internal/agent/dag_run_manage.go`:
- Around line 287-307: In dagRunManageList(), add a pre-validation that rejects
conflicting time flags by returning toolError when args.Last is set together
with args.From or args.To (e.g., if args.Last != "" && (args.From != "" ||
args.To != "")), mirroring cmd/history.go’s message "cannot use --last with
--from or --to (conflicting time range specifications)"; perform this check
before any parseDAGManageRelativeTime/parseDAGManageAbsoluteTime calls so you
avoid appending multiple exec.WithFrom() options and silent overrides.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1c68eeaf-fa9f-4d91-8184-36571099f44f
📒 Files selected for processing (6)
internal/agent/api.gointernal/agent/dag_def_manage.gointernal/agent/dag_manage_test.gointernal/agent/dag_run_manage.gointernal/agent/dag_run_watch.goui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
🚧 Files skipped from review as they are similar to previous changes (4)
- ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
- internal/agent/dag_manage_test.go
- internal/agent/dag_run_watch.go
- internal/agent/dag_def_manage.go
✅ Actions performedReview triggered.
|
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx (1)
28-28:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winStale comment — update to reflect agent steps
The comment says "Determine default selected step: last finished chat step" but the step pool now includes
agentnodes too.✏️ Proposed fix
- // Determine default selected step: last finished chat step + // Determine default selected step: last finished chat/agent step🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx` at line 28, Update the stale comment in ChatHistoryTab.tsx to reflect that the step pool includes agent nodes now—replace "Determine default selected step: last finished chat step" with a clearer note such as "Determine default selected step: last finished step (includes chat and agent steps)" so the comment accurately documents the selection logic for the step pool used by ChatHistoryTab.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/agent/dag_run_watch.go`:
- Around line 320-335: The notifyCtx created in poll is derived from the watch's
ctx so when complete calls entry.cancel() it cancels notifyCtx before r.notify
runs; change poll to create an independent timeout context for notification
(e.g., use context.WithTimeout(context.Background(), r.pollTimeout()) instead of
deriving from ctx) so r.notify(notifyCtx, ...) and AppendExternalMessage run
with a non-canceled root, or alternatively modify complete to delay calling
entry.cancel() until after r.notify returns; update the code around poll,
notifyCtx, complete, entry.cancel and the r.notify call accordingly.
In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx`:
- Around line 68-69: The selectedStep can become stale when historySteps
changes; ensure you guard against a missing selection by computing a
resolvedStep (e.g., check whether historySteps contains a step with name ===
selectedStep and if not fall back to defaultStep) or by resetting state (call
setSelectedStep(defaultStep) inside a useEffect when the selected name is
absent); use resolvedStep instead of selectedStep for the select value, for
deriving selectedNode (find by resolvedStep), for computing isSelectedActive
(use isActiveNodeStatus on the resolved node), and when rendering
StepMessagesTable so you never pass a non-existent step name downstream.
---
Outside diff comments:
In `@ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx`:
- Line 28: Update the stale comment in ChatHistoryTab.tsx to reflect that the
step pool includes agent nodes now—replace "Determine default selected step:
last finished chat step" with a clearer note such as "Determine default selected
step: last finished step (includes chat and agent steps)" so the comment
accurately documents the selection logic for the step pool used by
ChatHistoryTab.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d5f617d3-42eb-452e-b6a1-affbac9f5d78
📒 Files selected for processing (15)
internal/agent/api.gointernal/agent/contextkeys.gointernal/agent/dag_def_manage.gointernal/agent/dag_manage_test.gointernal/agent/dag_run_manage.gointernal/agent/dag_run_watch.gointernal/agent/session.gointernal/agent/system_prompt.txtinternal/agent/tool_registry.gointernal/agent/tool_registry_test.gointernal/cmd/agent.gointernal/runtime/agent/agent.gointernal/runtime/builtin/agentstep/executor.gointernal/service/frontend/server.goui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/agent/dag_run_manage.go`:
- Around line 379-386: The diagnose path currently only falls back to
node.ChatMessages when attempt.ReadStepMessages returns an empty slice; change
both call sites (the one around ReadStepMessages at the top of the snippet and
the second site around lines 431-435) so that any non-nil error from
attempt.ReadStepMessages also triggers the same fallback: if ReadStepMessages
returns an error or an empty slice, try status.NodeByName(args.StepName) and
assign node.ChatMessages to messages when available; update dagRunManageDiagnose
and the surrounding logic that uses messages to preserve chat history on read
failures.
- Around line 349-352: The code currently calls
normalizeDAGRunLogStream(args.Stream) which defaults an empty stream to
"scheduler", so a read_log request with a stepName but no stream will
incorrectly return the run/scheduler log; change the guard to fail fast when a
step-specific read is requested without an explicit stream: check the raw
args.Stream (not the normalized value) and if args.StepName is non-empty and
args.Stream is empty, return a toolError indicating that a stream is required
for step logs (e.g., in the same block that currently calls
normalizeDAGRunLogStream), otherwise continue to normalize and validate the
stream as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 441905df-2abd-4a46-b4aa-d87d0fd7d873
📒 Files selected for processing (4)
internal/agent/dag_manage_test.gointernal/agent/dag_run_manage.gointernal/agent/dag_run_watch.goui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
🚧 Files skipped from review as they are similar to previous changes (2)
- ui/src/features/dags/components/chat-history/ChatHistoryTab.tsx
- internal/agent/dag_run_watch.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/agent/dag_manage_test.go`:
- Around line 182-195: The test reuses the map variable `got` across two
json.Unmarshal calls which can leave stale keys (like "messages") from the first
payload and yield false positives; fix it by resetting or re-declaring `got`
before the second Unmarshal (e.g., assign got = nil or declare a new var) so the
second call to json.Unmarshal into `got` (used after the second runJSONTool(...)
call) cannot see leftover keys from the prior parse; update the test in
internal/agent/dag_manage_test.go around the second json.Unmarshal to
clear/recreate `got`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 59b31f07-c6f9-4fc9-a5d1-f5d0406eb17d
📒 Files selected for processing (2)
internal/agent/dag_manage_test.gointernal/agent/dag_run_manage.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/agent/dag_run_manage.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Summary
Validation
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Documentation