fix: replace app live polling with event-driven SSE updates#1968
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis pull request introduces DAG-run event-type filtering for bot notifications and replaces the legacy AppLive event system with a new Server-Sent Events (SSE) infrastructure. Configuration now supports filtering notifications by specific DAG-run lifecycle events (queued, running, waiting, succeeded, failed, aborted, rejected), while backend event storage tracks state transitions. The frontend shifts from a polling-based live invalidation manager to an SSE-driven cache synchronization system with on-demand topic refresh modes. Changes
Sequence Diagram(s)sequenceDiagram
participant Config as Config Loader
participant Bot as Bot Instance
participant Monitor as Monitor
participant EventStore as Event Store
participant Batcher as Notif. Batcher
participant Handler as Message Handler
Config->>Bot: Initialize with InterestedEventTypes
Bot->>Monitor: Create with filtered event types
EventStore->>Monitor: Emit DAGRunQueued
Monitor->>Monitor: Check if Queued in InterestedEventTypes
alt Event Type Interested
Monitor->>Batcher: Enqueue notification
Batcher->>Batcher: Classify as Informational
Batcher->>Handler: Notify user
else Event Type Filtered Out
Monitor->>Monitor: Skip event
end
EventStore->>Monitor: Emit DAGRunSucceeded
Monitor->>Monitor: Check if Succeeded in InterestedEventTypes
alt Event Type Interested
Monitor->>Batcher: Enqueue notification
Batcher->>Batcher: Classify as Success/Failure
Batcher->>Handler: Notify user
end
sequenceDiagram
participant Backend as Event Service
participant SSEInv as SSE Invalidator
participant Mux as Multiplexer
participant UI as UI Component
participant Cache as SWR Cache
Backend->>SSEInv: DAG-run event emitted
SSEInv->>SSEInv: Parse DAGRunSnapshot
SSEInv->>Mux: WakeTopic(dagrun:name/id)
Mux->>Mux: Mark topic as on-demand refresh
Mux->>UI: Notify topic changed
UI->>UI: Trigger data fetch
UI->>Cache: Fetch fresh data
Cache->>UI: Return updated payload
UI->>UI: Update component state
note over Mux,UI: Alternative: Polling Mode
Mux->>Mux: Timer tick for polling topic
Mux->>UI: Notify topic needs refresh
UI->>Cache: Fetch data per polling interval
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ui/src/hooks/SSEManager.ts (1)
1-8:⚠️ Potential issue | 🟡 MinorMissing GPL v3 license header.
This TypeScript file is missing the required GPL v3 license header. As per coding guidelines, source files matching
**/*.{go,ts,tsx,js}must apply GPL v3 license headers, managed viamake addlicense.Proposed fix
+// Copyright (C) 2026 Yota Hamada +// SPDX-License-Identifier: GPL-3.0-or-later + import { getAuthHeaders, getAuthToken } from '@/lib/authHeaders';🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ui/src/hooks/SSEManager.ts` around lines 1 - 8, Add the GPL v3 license header to this TypeScript source (ui/src/hooks/SSEManager.ts) at the top of the file so it matches the project's required header for **/*.{go,ts,tsx,js}; you can do this manually or run the repository tool to apply it consistently: ensure the header precedes the existing imports (e.g., before getAuthHeaders/getAuthToken) and then run make addlicense to verify/apply the standard GPL v3 header across the file.internal/service/frontend/sse/types.go (1)
55-80:⚠️ Potential issue | 🟠 MajorDo not collapse query-backed topics across
remoteNode.Lines 62, 64, 65, and 67 describe list topics whose identity is the sanitized query string, but Lines 133-136 delete
remoteNodebefore that key is built. That makespage=1&remoteNode=node-aandpage=1&remoteNode=node-bshare the same multiplexed topic, so multi-node clients can receive and cache the wrong node’s data.Suggested fix
// Remove sensitive parameters that should not be part of topic identity values.Del("token") - values.Del("remoteNode")As per coding guidelines, "All API calls MUST include the remoteNode parameter to route requests to the correct node in multi-node Dagu deployments".
Also applies to: 133-136
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/sse/types.go` around lines 55 - 80, The bug is that list/query-backed topics (TopicTypeDAGRuns, TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree) are built using a sanitized query string that currently strips out remoteNode, causing different remote nodes to collapse into one multiplexed topic; preserve the remoteNode parameter instead of deleting it when building the topic key (i.e., stop removing remoteNode in the code that sanitizes/normalizes the query before constructing the topic identifier) so the final topic key for those types includes remoteNode and multi-node routing remains correct.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/config/loader.go`:
- Around line 1179-1181: The code unconditionally overwrites
cfg.Bots.Telegram.InterestedEventTypes from the config value and therefore
clobbers any earlier env-derived value; change the assignment in the loader so
you only set cfg.Bots.Telegram.InterestedEventTypes from
parseInterestedEventTypes(l.v.GetString("bots.telegram.interested_event_types"))
if the config file actually provided the key and the struct does not already
contain a value from env (e.g. check
l.v.InConfig("bots.telegram.interested_event_types") or that
cfg.Bots.Telegram.InterestedEventTypes is nil/empty before assigning). Apply the
same fix pattern to the other similar blocks (the ones around the other noted
ranges) so env-over-file precedence is preserved and env can be used to clear
the list.
In `@internal/persis/fileeventstore/notifications.go`:
- Around line 91-97: The code advances the inbox cursor before ensuring
s.readInboxDAGRunEvent(name) succeeded, causing transient read/parse errors to
be permanently skipped; update the loop so that cursor.LastInboxFile is only
advanced after a successful read and processing of the DAG-run event (i.e., call
or assign to cursor.LastInboxFile after s.readInboxDAGRunEvent returns nil), and
if a read/parse error occurs do not advance the cursor — instead quarantine the
bad file (e.g., rename/move using s.inboxDir and name to a ".bad" or failed
bucket) or surface a persistent failure so it can be retried, keeping the
existing slog.Warn but removing the skip that advances the cursor on error.
In `@internal/service/chatbridge/notifications.go`:
- Around line 397-406: NotificationClassForEvent currently returns
(NotificationClassUnknown, false) for eventstore.TypeDAGRunAborted and
eventstore.TypeDAGRunRejected so those persisted notification states are never
batched/delivered; change the map for NotificationClassForEvent (the switch
handling eventstore.TypeDAGRunAborted and eventstore.TypeDAGRunRejected) to
return a valid class (e.g., NotificationClassUrgent) and true so these terminal
DAG run events are enqueued by NotificationBatcher.Enqueue and not dropped,
preventing stale pending entries; update only the return for those cases in
NotificationClassForEvent.
In `@internal/service/eventstore/notifications.go`:
- Around line 244-251: DAGRunSnapshotFromEvent currently returns the decoded
snapshot from dagRunStatusSnapshotFromData without backfilling legacy top-level
dag_file; modify DAGRunSnapshotFromEvent to call
dagRunFileNameFromData(event.Data) after decoding and, if snapshot.DAGFile is
empty and dagRunFileNameFromData returns a non-empty value, set snapshot.DAGFile
to that value before returning; apply the same backfill logic to the other
event-to-snapshot function(s) in this file (the block around lines 264-305) so
legacy events carrying notification_status + top-level dag_file populate DAGFile
for downstream consumers.
In `@internal/service/frontend/api/v1/dagruns.go`:
- Around line 2905-2911: The current error branch that always returns
fmt.Errorf("sub dag-run ID %s not found for DAG %s", parts[2], parts[0]) is
masking context timeout/cancel errors; update the handler to first detect and
preserve cancellation/timeout (use errors.Is(err, context.Canceled) and
errors.Is(err, context.DeadlineExceeded) and return err directly) and only
wrap/transform other non-nil errors into the not-found message (i.e., if the
error is not a context cancel/timeout, return the formatted not-found error);
apply this change where the lookup error is handled (the branch referencing
parts[2] and parts[0]).
In `@internal/service/frontend/server.go`:
- Line 1149: The call to StartDAGRunEventInvalidation is being started even when
no wake source is configured (event_store.enabled false), which leaves topics
without invalidation or polling; guard the call by checking the configured wake
source / event service before starting it (e.g., verify srv.eventService is
non-nil or the event_store.enabled config flag is true) and only invoke
sse.StartDAGRunEventInvalidation(srv.sseMultiplexer.Context(), srv.eventService,
srv.sseMultiplexer, slog.Default(), time.Second) when that check passes; apply
the same guard to the other similar starts in the surrounding block (the calls
around the StartDAGRunEventInvalidation region) so topics are not switched to
on-demand mode when no wake source exists.
In `@ui/src/features/dag-runs/hooks/__tests__/useBoundedDAGRunDetails.test.tsx`:
- Around line 1-3: The new test file that imports useBoundedDAGRunDetails is
missing the repository's GPL v3 license header; add the standard GPL header to
this source file by running the repository's license tool (run make addlicense)
or manually insert the project's GPL v3 header at the top of the file so the
test file that imports useBoundedDAGRunDetails includes the required license
notice.
In `@ui/src/features/dag-runs/hooks/dagRunPagination.ts`:
- Around line 77-80: The hook fetchAllDAGRuns (and the other API-call sites
noted around lines 90-95 and 333-343) must ensure a remoteNode is always present
on requests: derive the correct remoteNode inside the hook (e.g., from the
client, context, or a helper that returns current remoteNode) and merge it into
the outgoing query object before calling the API so you never forward the
caller's raw query; update fetchAllDAGRuns to create a new params object = {
...query, remoteNode: derivedRemoteNode } (and do the same at the other
referenced call sites) so every /dag-runs request includes remoteNode per the
ui/* coding guideline.
In `@ui/src/features/dag-runs/hooks/useBoundedDAGRunDetails.ts`:
- Around line 55-65: The SSE subscriptions are missing the remote node routing
so pass target?.remoteNode into both useDAGRunSSE and useSubDAGRunSSE calls;
update the calls to include the remoteNode argument (e.g.,
useDAGRunSSE(target?.name ?? '', target?.dagRunId ?? '', enabled && target !=
null && !isSubDAGRunTarget, target?.remoteNode) and similarly for
useSubDAGRunSSE with target?.parentName, target?.parentDAGRunId,
target?.subDAGRunId, the same enabled condition, and target?.remoteNode) so the
stream is correctly scoped to the selected remote node.
In `@ui/src/hooks/useSubDAGRunSSE.ts`:
- Around line 1-3: Add the GPL v3 license header block to the top of the new
TypeScript source file useSubDAGRunSSE.ts (the file that imports "components"
and imports SSEState and useSSE) so it conforms with the repository policy for
"*.{go,ts,tsx,js}" files; insert the standard GPL v3 header comment as used by
make addlicense above the existing imports (before the lines referencing
components, SSEState, and useSSE).
- Around line 16-19: The SSE endpoint string in useSubDAGRunSSE (the endpoint
constant passed to useSSE<SubDAGRunSSEResponse>) is missing the remoteNode
parameter; update the endpoint construction to include remoteNode (from the hook
args or scope) either as part of the path or as a query parameter so the request
is routed to the correct node in multi-node deployments—ensure you reference the
existing variables name, dagRunId, subDAGRunId and add remoteNode into the
encoded URL before calling useSSE.
In `@ui/src/pages/dag-runs/index.tsx`:
- Around line 98-120: The auto-pagination can become permanently unreachable
after a transient failure or when IntersectionObserver is unavailable; update
the code so there is always a manual recovery path: restore an explicit control
that calls handleLoadMore (e.g., render a "Load more" button that invokes
handleLoadMore) and ensure useAutoLoadMore (the hook taking sentinelRef,
enabled, onLoadMore) does not permanently disable loading on
loadMoreError—either accept a separate flag (e.g., forceManualFallback) or
change the enabled logic so loadMoreError only stops automatic retries but still
allows manual handleLoadMore invocations; also keep the IntersectionObserver
feature-dallback: when IntersectionObserver is undefined, render the manual
"Load more" control so remaining pages remain reachable.
---
Outside diff comments:
In `@internal/service/frontend/sse/types.go`:
- Around line 55-80: The bug is that list/query-backed topics (TopicTypeDAGRuns,
TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree) are built using a
sanitized query string that currently strips out remoteNode, causing different
remote nodes to collapse into one multiplexed topic; preserve the remoteNode
parameter instead of deleting it when building the topic key (i.e., stop
removing remoteNode in the code that sanitizes/normalizes the query before
constructing the topic identifier) so the final topic key for those types
includes remoteNode and multi-node routing remains correct.
In `@ui/src/hooks/SSEManager.ts`:
- Around line 1-8: Add the GPL v3 license header to this TypeScript source
(ui/src/hooks/SSEManager.ts) at the top of the file so it matches the project's
required header for **/*.{go,ts,tsx,js}; you can do this manually or run the
repository tool to apply it consistently: ensure the header precedes the
existing imports (e.g., before getAuthHeaders/getAuthToken) and then run make
addlicense to verify/apply the standard GPL v3 header across the file.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b16dea4e-5216-41fa-a47a-ee2c730e6ba2
📒 Files selected for processing (53)
internal/cmd/server.gointernal/cmd/startall.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/persis/filedagrun/attempt.gointernal/persis/filedagrun/attempt_test.gointernal/persis/fileeventstore/notifications.gointernal/service/chatbridge/monitor.gointernal/service/chatbridge/monitor_test.gointernal/service/chatbridge/notifications.gointernal/service/chatbridge/notifications_test.gointernal/service/eventstore/context.gointernal/service/eventstore/eventstore.gointernal/service/eventstore/eventstore_test.gointernal/service/eventstore/notifications.gointernal/service/frontend/api/v1/dagruns.gointernal/service/frontend/server.gointernal/service/frontend/sse/dagrun_invalidator.gointernal/service/frontend/sse/multiplex.gointernal/service/frontend/sse/multiplex_test.gointernal/service/frontend/sse/topic_parse.gointernal/service/frontend/sse/types.gointernal/service/slack/bot.gointernal/service/slack/monitor.gointernal/service/telegram/bot.gointernal/service/telegram/monitor.goui/src/features/cockpit/hooks/useDateKanbanData.tsui/src/features/dag-runs/hooks/__tests__/useBoundedDAGRunDetails.test.tsxui/src/features/dag-runs/hooks/__tests__/useExactDAGRuns.test.tsxui/src/features/dag-runs/hooks/dagRunPagination.tsui/src/features/dag-runs/hooks/useBoundedDAGRunDetails.tsui/src/features/dags/components/dag-details/DAGDetailsPanel.tsxui/src/features/dags/components/dag-details/DAGDetailsSidePanel.tsxui/src/features/dags/components/dag-details/__tests__/DAGDetailsSidePanel.test.tsxui/src/features/dags/components/dag-editor/DAGSpec.tsxui/src/features/dags/components/dag-execution/DAGExecutionHistory.tsxui/src/hooks/AppLiveManager.tsui/src/hooks/SSEManager.tsui/src/hooks/__tests__/SSEManager.test.tsui/src/hooks/useAppLive.tsui/src/hooks/useSSECacheSync.tsui/src/hooks/useSubDAGRunSSE.tsui/src/pages/dag-runs/index.tsxui/src/pages/dags/dag/index.tsxui/src/pages/dags/index.tsxui/src/pages/docs/components/DocEditor.tsxui/src/pages/docs/index.tsxui/src/pages/event-logs/index.tsxui/src/pages/index.tsxui/src/pages/queues/index.tsx
💤 Files with no reviewable changes (3)
- ui/src/features/cockpit/hooks/useDateKanbanData.ts
- ui/src/hooks/AppLiveManager.ts
- ui/src/hooks/useAppLive.ts
Summary
dagrunswake publishes and harden notification event handling and batchingRoot cause
fsnotify/kqueue, which consumed one file descriptor per watched path and eventually exhausted the process FD tableImpact
Validation
make fmtgo test ./internal/service/eventstore ./internal/persis/fileeventstore ./internal/persis/filedagrun ./internal/service/chatbridge ./internal/cmn/config ./internal/service/frontend/sse ./internal/service/frontendgo test ./internal/service/slack ./internal/service/telegram ./internal/cmd/...go test ./internal/service/chatbridge ./internal/service/eventstore ./internal/service/frontend/sse ./internal/service/frontendpnpm exec vitest run src/hooks/__tests__/SSEManager.test.ts src/features/dag-runs/hooks/__tests__/useExactDAGRuns.test.tsx src/features/dags/components/dag-details/__tests__/DAGDetailsSidePanel.test.tsxpnpm exec vitest run src/pages/event-logs/__tests__/index.test.tsxpnpm exec vitest run src/features/dag-runs/hooks/__tests__/useBoundedDAGRunDetails.test.tsx src/features/dag-runs/hooks/__tests__/useExactDAGRuns.test.tsx src/features/dag-runs/components/dag-run-details/__tests__/DAGRunDetailsPanel.test.tsxpnpm exec tsc --noEmitSummary by CodeRabbit
Release Notes
New Features
Improvements