Skip to content

refactor: polling to SSE for realtime update#1608

Merged
yohamta0 merged 39 commits into
mainfrom
sse-handler
Jan 25, 2026
Merged

refactor: polling to SSE for realtime update#1608
yohamta0 merged 39 commits into
mainfrom
sse-handler

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jan 25, 2026

Copy link
Copy Markdown
Collaborator

Summary by CodeRabbit

Release Notes

  • New Features

    • Added real-time server-sent events (SSE) support for DAG runs, DAGs, logs, steps, and queues, enabling live updates without manual refresh.
    • Live log streaming with automatic fallback to polling when real-time connection is unavailable.
    • Real-time queue monitoring with running and queued item tracking.
  • Improvements

    • Enhanced frontend responsiveness with live data synchronization across multiple dashboards and views.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Jan 25, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

The PR introduces a comprehensive Server-Sent Events (SSE) infrastructure for real-time data streaming. Backend changes include new SSE modules (client, hub, handler, watcher, metrics, proxy), public API methods for SSE data retrieval, and server lifecycle integration. Frontend changes add SSE consumption hooks and refactor UI components to use SSE with polling fallback for real-time updates across DAG runs, DAG details, logs, and queue data.

Changes

Cohort / File(s) Summary
SSE Backend Infrastructure
internal/service/frontend/sse/client.go, internal/service/frontend/sse/handler.go, internal/service/frontend/sse/hub.go, internal/service/frontend/sse/metrics.go, internal/service/frontend/sse/proxy.go, internal/service/frontend/sse/types.go, internal/service/frontend/sse/watcher.go
Added complete SSE subsystem with client lifecycle, topic-based hub coordination, event broadcasting, metrics collection, remote proxying, and adaptive polling watchers; includes functional options for hub configuration and thread-safe client/watcher management.
API v2 Data Endpoints
internal/service/frontend/api/v2/dagruns.go, internal/service/frontend/api/v2/dags.go, internal/service/frontend/api/v2/queues.go
Added public data accessor methods (GetDAGRunDetailsData, GetDAGRunLogsData, GetStepLogData, GetDAGRunsListData, GetDAGDetailsData, GetDAGsListData, GetQueuesListData, GetQueueItemsData) with new response types for SSE consumption and helper functions for parameter parsing.
Transformer Export
internal/service/frontend/api/v2/transformer.go
Promoted toDAGRunDetails to public ToDAGRunDetails function; updated all call sites across dagruns.go and dags.go to use the exported version.
Frontend Server Integration
internal/service/frontend/server.go
Added sseHub and metricsRegistry fields to Server; integrated SSE route setup, fetcher registration, and hub lifecycle (start/shutdown) during server initialization; added sanitized request logger for token redaction.
UI SSE Hooks
ui/src/hooks/useSSE.ts, ui/src/hooks/useDAGRunSSE.ts, ui/src/hooks/useDAGRunLogsSSE.ts, ui/src/hooks/useDAGRunsListSSE.ts, ui/src/hooks/useDAGSSE.ts, ui/src/hooks/useDAGsListSSE.ts, ui/src/hooks/useStepLogSSE.ts, ui/src/hooks/useQueueItemsSSE.ts, ui/src/hooks/useQueuesListSSE.ts
Added generic SSE hook (useSSE) with exponential backoff retry, fallback detection, and connection state tracking; added specialized hooks for each data type with endpoint construction and type-safe response interfaces.
UI Component Updates
ui/src/features/dag-runs/components/dag-run-details/DAGRunDetailsPanel.tsx, ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx, ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx, ui/src/features/dags/components/dag-execution/ExecutionLog.tsx, ui/src/features/dags/components/dag-execution/StepLog.tsx
Refactored components to use SSE hooks with polling fallback; updated data source selection to prioritize SSE data, integrated adaptive polling based on SSE availability, enhanced keyboard navigation and fullscreen handling.
UI Pages
ui/src/pages/dag-runs/index.tsx, ui/src/pages/dags/index.tsx, ui/src/pages/index.tsx, ui/src/pages/queues/index.tsx
Integrated SSE data sources (useDAGRunsListSSE, useDAGsListSSE, useQueuesListSSE) with conditional polling fallback; updated data flow to prefer SSE when connected; adjusted metrics and filtering logic; simplified rendering to use merged SSE + polling data sources.
SSE Test Coverage
internal/service/frontend/sse/client_test.go, internal/service/frontend/sse/handler_test.go, internal/service/frontend/sse/hub_test.go, internal/service/frontend/sse/integration_test.go, internal/service/frontend/sse/metrics_test.go, internal/service/frontend/sse/proxy_test.go, internal/service/frontend/sse/types_test.go, internal/service/frontend/sse/watcher_test.go
Added comprehensive test suites covering client lifecycle, handler endpoints, hub subscription/heartbeat mechanics, end-to-end integration, metrics collection, proxy routing, and watcher polling behavior.
Minor Infrastructure Changes
internal/runtime/subcmd.go, internal/service/scheduler/entryreader.go
Updated inline comments in TaskStart argument construction and reduced log verbosity for DAG loading (Info → Debug).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Browser
    participant Handler
    participant Hub
    participant Watcher
    participant FetchFunc
    participant DataStore

    Client->>Browser: Establish SSE Connection
    Browser->>Handler: GET /events/dag-runs/{id}
    Handler->>Hub: Subscribe(client, topic)
    Hub->>Watcher: GetOrCreate(topic)
    Watcher->>FetchFunc: Start polling in background
    
    loop Polling Interval
        Watcher->>FetchFunc: Fetch data for topic
        FetchFunc->>DataStore: Query current state
        DataStore-->>FetchFunc: Return data
        Watcher->>Watcher: Hash data, detect change
        alt Data Changed
            Watcher->>Hub: Broadcast(Event: Data)
            Hub->>Client: Send SSE event
            Client->>Browser: Update UI
        else No Change
            Watcher->>Watcher: Skip broadcast
        end
    end
    
    Browser->>Handler: Close connection
    Handler->>Hub: Unsubscribe(client)
    Hub->>Watcher: Remove client
    alt Last Client
        Hub->>Watcher: Stop()
        Watcher->>Watcher: Cancel polling
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 31.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'refactor: polling to SSE for realtime update' clearly describes the main objective of the changeset—migrating from polling-based to Server-Sent Events (SSE) for real-time updates across the application.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

🤖 Fix all issues with AI agents
In `@internal/service/frontend/api/v2/sse_data.go`:
- Around line 96-171: GetDAGRunLogsData currently hardcodes
fileutil.LogReadOptions{Tail: 500} which causes SSE payloads to always return
500 lines regardless of the UI pageSize; modify GetDAGRunLogsData to accept a
tail parameter from the SSE identifier/query (e.g., parse an optional tail value
from identifier or read from the SSE request query) and use that value when
constructing fileutil.LogReadOptions.Tail (fallback to existing default like 500
or ui default 1000 if missing), validate/clamp the tail to reasonable bounds,
and ensure the same identifier parsing logic around strings.Cut(ref :=
exec.NewDAGRunRef(...)) continues to work so SSE and REST return consistent line
counts.

In `@internal/service/frontend/sse/proxy.go`:
- Around line 32-38: The TLS config for the HTTP client (the client variable
using http.Transport with TLSClientConfig and node.SkipTLSVerify) must set a
secure minimum TLS version and the http.Client should have a request timeout to
avoid goroutine leaks; update the tls.Config to include MinVersion (e.g.,
tls.VersionTLS12 or higher) and set a sensible Timeout on the http.Client, and
ensure the "time" package is imported for the timeout value.

In `@internal/service/frontend/sse/watcher.go`:
- Around line 117-129: The broadcast method on Watcher calls
w.metrics.MessageSent(...) without checking for a nil w.metrics, which can
panic; update Watcher.broadcast to guard the metrics call (e.g., if w.metrics !=
nil) before invoking MessageSent, mirroring the nil-check pattern used in poll
so that metrics are only recorded when w.metrics is non-nil while preserving the
existing Send/Close logic for clients.
- Around line 83-98: The poll method in Watcher calls w.metrics.FetchError(...)
without checking if w.metrics is nil, which will panic; add a nil check before
invoking FetchError (e.g., if w.metrics != nil {
w.metrics.FetchError(string(w.topicType)) }) so metrics calls are only made when
w.metrics is non-nil; keep the existing backoff, broadcast(&Event{Type:
EventTypeError, Data: err.Error()}) and errorBackoff.Next logic intact.
- Around line 53-70: The Watcher currently never emits lifecycle metrics; call
the Metrics methods when the watcher starts and stops: invoke
w.metrics.WatcherStarted() at the beginning of Watcher.Start (after creating the
ticker / after initial poll) and ensure w.metrics.WatcherStopped() is invoked
whenever Start returns (on ctx.Done(), w.stopCh or any early exit), and also add
the stop metric call inside Watcher.Stop (or wherever w.stopCh is closed) so
both Watcher.Start and Watcher.Stop consistently emit
WatcherStarted()/WatcherStopped() using the existing w.metrics field.

In `@ui/src/features/dags/components/dag-execution/ExecutionLog.tsx`:
- Around line 68-69: isLiveMode is only initialized from isRunning and never
updated, so when dagRun.status stops being Status.Running the UI can hide the
live toggle while polling/SSE remain active; add a useEffect in ExecutionLog
that watches isRunning (derived from dagRun?.status === Status.Running) and
calls setIsLiveMode(isRunning) or sets false when isRunning becomes false to
keep isLiveMode in sync with dagRun state (use the existing isLiveMode,
setIsLiveMode, isRunning, and Status.Running symbols).
- Around line 238-247: The highlight uses the removed Tailwind v4
`bg-opacity-20`; in the scroll-to-line highlight block inside ExecutionLog.tsx
(the loop that finds elements by '[data-line-number]' and calls scrollIntoView),
replace the two-class approach htmlElement.classList.add('bg-primary/100',
'bg-opacity-20') and the corresponding classList.remove with a single
slash-opacity color class (e.g., 'bg-primary/100/20' or if your palette uses
hyphen shades 'bg-primary-100/20' or simply 'bg-primary/20'); remove any use of
'bg-opacity-20' so add/remove use the new single class name.

In `@ui/src/hooks/useDAGRunsListSSE.ts`:
- Around line 10-17: The DAGRunsListParams.interface currently types status as
string but the API uses a numeric Status enum; change the status property to
number (status?: number) in the DAGRunsListParams definition and update any call
sites in useDAGRunsListSSE (or functions that build the request/polling params)
to stop treating it as a string—remove parseInt() conversions and ensure values
passed to the polling endpoint are numeric so types and runtime behavior align
with the API.

In `@ui/src/hooks/useSSE.ts`:
- Around line 70-74: The SSE listener registered via
eventSource.addEventListener('data', ...) currently calls
JSON.parse(messageEvent.data) without protection; wrap the parse in a try-catch
inside that listener so malformed JSON doesn't throw and break the SSE flow, log
or handle the parse error (e.g., processLogger/warn or set an error state) and
avoid calling setState with invalid parsed data; locate the listener in useSSE
(the handler that casts event as MessageEvent and assigns to parsed) and enclose
JSON.parse(...) in a try block with a catch that handles/logs the error and
returns early.
- Around line 93-98: In useSSE's retry logic, increment retryCountRef.current
before scheduling the setTimeout so the retry attempt uses the updated count
(preventing an extra attempt); specifically, inside the block that checks
retryCountRef.current < MAX_RETRIES, move the retryCountRef.current++ to execute
immediately (before calling setTimeout stored in retryTimeoutRef) and then
compute the delay via calculateRetryDelay(retryCountRef.current) and call
connect() from the timeout—this ensures retries 1..MAX_RETRIES occur instead of
0..MAX_RETRIES-1.
🧹 Nitpick comments (10)
ui/src/hooks/useDAGRunSSE.ts (1)

10-17: Consider simplifying the return type annotation for consistency.

Other SSE hooks in this PR use SSEState<T> directly as the return type. Using ReturnType<typeof useSSE<DAGRunSSEResponse>> achieves the same result but is more verbose and inconsistent with sibling hooks.

♻️ Suggested simplification
 export function useDAGRunSSE(
   name: string,
   dagRunId: string,
   enabled: boolean = true
-): ReturnType<typeof useSSE<DAGRunSSEResponse>> {
+): SSEState<DAGRunSSEResponse> {
   const endpoint = `/events/dag-runs/${encodeURIComponent(name)}/${encodeURIComponent(dagRunId)}`;
   return useSSE<DAGRunSSEResponse>(endpoint, enabled);
 }
internal/service/frontend/sse/proxy.go (1)

139-154: Consider logging errors for debugging proxy issues.

The streamResponse function silently returns on errors without logging. While this prevents error spam, it can make debugging proxy connectivity issues difficult in production.

♻️ Optional: Add debug logging
+import "log/slog"

 // streamResponse copies data from the response body to the client.
 func streamResponse(w http.ResponseWriter, flusher http.Flusher, body io.Reader) {
 	buf := make([]byte, 4096)
 	for {
 		n, readErr := body.Read(buf)
 		if n > 0 {
 			if _, writeErr := w.Write(buf[:n]); writeErr != nil {
+				slog.Debug("SSE proxy write error", "error", writeErr)
 				return
 			}
 			flusher.Flush()
 		}
 		if readErr != nil {
+			if readErr != io.EOF {
+				slog.Debug("SSE proxy read error", "error", readErr)
+			}
 			return
 		}
 	}
 }
ui/src/hooks/useDAGsListSSE.ts (1)

7-11: Consider making DAGsListSSEResponse interface exported directly.

The interface is currently declared without export and only re-exported at the end of the file (line 39). For consistency with StepLogSSEResponse in useStepLogSSE.ts and DAGRunLogsSSEResponse in useDAGRunLogsSSE.ts, consider exporting the interface directly at declaration.

Suggested change
-interface DAGsListSSEResponse {
+export interface DAGsListSSEResponse {
   dags: DAGFile[];
   errors: string[];
   pagination: Pagination;
 }
ui/src/hooks/useSSE.ts (1)

109-117: Consider resetting state when dependencies change to avoid stale data.

When endpoint or remoteNode changes, the effect reconnects but doesn't reset the state. This could leave stale data from a previous connection visible while connecting to a new endpoint.

Proposed enhancement
   useEffect(() => {
+    // Reset state when connection parameters change
+    setState(INITIAL_STATE as SSEState<T>);
+    retryCountRef.current = 0;
     connect();
     return () => {
       if (retryTimeoutRef.current) {
         clearTimeout(retryTimeoutRef.current);
       }
       eventSourceRef.current?.close();
     };
   }, [connect]);
ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx (2)

45-46: Hardcoded dagRunId and stepName values may limit component reusability.

These are hardcoded as 'latest' and null but passed to DAGDetailsContent. If the component should support viewing specific DAG runs, consider accepting these as props or deriving them from route/state.


149-166: formatDuration has no dependencies but is wrapped in useCallback.

Since formatDuration doesn't depend on any reactive values, it could be moved outside the component or declared as a plain function inside. The empty dependency array works but the useCallback wrapper adds unnecessary overhead.

Move outside component
+function formatDuration(startDate: string, endDate: string): string {
+  if (!startDate || !endDate) {
+    return '--';
+  }
+  const duration = dayjs.duration(dayjs(endDate).diff(dayjs(startDate)));
+  const hours = Math.floor(duration.asHours());
+  const minutes = duration.minutes();
+  const seconds = duration.seconds();
+
+  if (hours > 0) {
+    return `${hours}h ${minutes}m ${seconds}s`;
+  }
+  if (minutes > 0) {
+    return `${minutes}m ${seconds}s`;
+  }
+  return `${seconds}s`;
+}
+
 function DAGDetailsPanel({ fileName, onClose, onNavigate }: Props): React.ReactElement | null {
   // ... rest of component
-  const formatDuration = useCallback((startDate: string, endDate: string): string => {
-    // ...
-  }, []);
ui/src/features/dags/components/dag-execution/StepLog.tsx (2)

266-269: Accessibility: Use accessible class toggling instead of direct class manipulation.

Directly adding/removing classes with classList for visual feedback may not work well with Tailwind's purge mechanism and could cause issues. Consider using React state for the highlight effect.

Alternative using React state
const [highlightedLine, setHighlightedLine] = useState<number | null>(null);

// In handleJumpToLine:
setHighlightedLine(lineNum);
setTimeout(() => setHighlightedLine(null), 2000);

// In render:
<span
  className={`... ${highlightedLine === getLineNumber(index) ? 'bg-primary/20' : ''}`}
  data-line-number={getLineNumber(index)}
>

187-211: Effect has potential stale closure issue with cachedData.

The effect checks cachedData in its logic but also depends on it, which could cause unnecessary re-runs. Consider using a ref for the isInitialLoad check instead of including cachedData in the dependency array condition.

ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (1)

70-115: Consider memoizing handleFullscreenClick with useCallback.

Unlike DAGRunDetailsPanel.tsx which wraps this function in useCallback, here it's recreated every render. This causes the useEffect to unnecessarily re-run and re-attach event listeners.

♻️ Suggested improvement
- const handleFullscreenClick = (e?: React.MouseEvent) => {
+ const handleFullscreenClick = React.useCallback((e?: React.MouseEvent) => {
    const url =
      activeTab === 'status'
        ? `/dags/${fileName}`
        : `/dags/${fileName}/${activeTab}`;

    if (e?.metaKey || e?.ctrlKey) {
      window.open(url, '_blank');
    } else {
      navigate(url);
    }
- };
+ }, [activeTab, fileName, navigate]);
internal/service/frontend/sse/handler.go (1)

107-118: Move SetSSEHeaders after client creation to ensure proper error responses.

Currently, SetSSEHeaders is called before NewClient, which sets Content-Type: text/event-stream. If client creation or subscription fails, the subsequent http.Error call will write an error message with the SSE content type, which may confuse clients.

♻️ Suggested fix
- SetSSEHeaders(w)
- 
  client, err := NewClient(w)
  if err != nil {
    http.Error(w, err.Error(), http.StatusInternalServerError)
    return
  }

+ SetSSEHeaders(w)
+
  if err := h.hub.Subscribe(client, topic); err != nil {

Comment thread internal/service/frontend/api/v2/sse_data.go Outdated
Comment thread internal/service/frontend/sse/proxy.go
Comment thread internal/service/frontend/sse/watcher.go
Comment thread internal/service/frontend/sse/watcher.go
Comment thread internal/service/frontend/sse/watcher.go
Comment thread ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
Comment thread ui/src/features/dags/components/dag-execution/ExecutionLog.tsx
Comment thread ui/src/hooks/useDAGRunsListSSE.ts
Comment thread ui/src/hooks/useSSE.ts
Comment thread ui/src/hooks/useSSE.ts
@codecov

codecov Bot commented Jan 25, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 68.94%. Comparing base (6c63c31) to head (24fc9fc).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1608      +/-   ##
==========================================
+ Coverage   68.88%   68.94%   +0.06%     
==========================================
  Files         291      291              
  Lines       33052    33120      +68     
==========================================
+ Hits        22767    22835      +68     
- Misses       8316     8319       +3     
+ Partials     1969     1966       -3     
Files with missing lines Coverage Δ
internal/runtime/subcmd.go 80.89% <ø> (-9.96%) ⬇️
internal/service/scheduler/entryreader.go 62.09% <100.00%> (ø)

... and 13 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6c63c31...24fc9fc. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@yohamta0

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review again

@coderabbitai

coderabbitai Bot commented Jan 25, 2026

Copy link
Copy Markdown

@yottahmd, I'll perform a comprehensive review of all the changes in this PR.

✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
ui/src/pages/queues/index.tsx (1)

102-109: Use a remoteNode constant for useQuery.
Guidelines require the const remoteNode = appBarContext.selectedRemoteNode || 'local'; pattern when wiring useQuery params.

♻️ Proposed adjustment
-  const remoteKey = appBarContext.selectedRemoteNode || 'local';
+  const remoteNode = appBarContext.selectedRemoteNode || 'local';
+  const remoteKey = remoteNode;
...
-          remoteNode: appBarContext.selectedRemoteNode || 'local',
+          remoteNode,

As per coding guidelines, use the remoteNode constant pattern for useQuery hooks.

ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (2)

85-110: Avoid handling Ctrl/Cmd+F as fullscreen.

Right now F triggers fullscreen even when the user presses Ctrl/Cmd+F to search.

♻️ Suggested tweak
-        case 'f':
-        case 'F':
-          handleFullscreenClick();
-          break;
+        case 'f':
+        case 'F':
+          if (event.metaKey || event.ctrlKey) return;
+          handleFullscreenClick();
+          break;

127-135: Loading gate may block DAGs with no runs.

If latestDAGRun can be null for a brand-new DAG, the modal will stay in a loading state even though data.dag is present. Consider gating on data?.dag instead.

🐛 Suggested fix
-  const hasData = data && data.latestDAGRun;
+  const hasData = !!data?.dag;
🤖 Fix all issues with AI agents
In `@internal/service/frontend/api/v2/dags.go`:
- Around line 350-391: getDAGDetailsData currently treats any error from
a.dagRunMgr.GetLatestStatus as fatal which hides valid DAGs with no runs; update
the GetLatestStatus error handling in getDAGDetailsData so that if the error is
exec.ErrNoStatusData (or the equivalent sentinel from the exec package) you
treat there being no latest run (e.g., leave dagStatus nil/zero and set
LatestDAGRun to an empty/nil value via ToDAGRunDetails), but for other errors
return the formatted error as before; adjust the call site around
a.dagRunMgr.GetLatestStatus and ToDAGRunDetails to handle the absent-status case
without failing the whole response.

In `@internal/service/frontend/api/v2/queues.go`:
- Around line 299-353: GetQueueItemsData duplicates DAG run summary fetching and
leaves running/queued as nil slices; refactor to call the existing helper
fetchDAGRunSummary for both the running loop (currently iterating runningRefs
from procStore.ListAllAlive) and the queued loop (iterating queuedItems from
queueStore.List) to remove duplicated logic, and initialize running and queued
with make([]api.DAGRunSummary, 0) so QueueItemsResponse always marshals to empty
arrays instead of null.

In `@internal/service/frontend/sse/handler_test.go`:
- Around line 329-366: The test TestHandleSSEMaxClients uses a fixed time.Sleep
to wait for the first client to connect which can cause flakiness; replace the
sleep with a deterministic wait that polls hub.ClientCount() (or a small loop
with timeout) after starting the goroutine for handler.HandleDAGEvents so the
test proceeds only once NewHub(WithMaxClients(1)) reports the first client is
registered, then run the second request and assert StatusServiceUnavailable and
"max clients reached"; ensure you reference TestHandleSSEMaxClients,
NewHub/WithMaxClients, hub.ClientCount(), and Handler.HandleDAGEvents when
implementing the polling wait.

In `@internal/service/frontend/sse/handler.go`:
- Around line 151-154: The code sends an Event with Data built by fmt.Sprintf
containing the raw topic, which allows JSON injection if topic contains
quotes/backslashes; update the construction of the Data field in the client.Send
call inside the handler so the topic is JSON-encoded (e.g., build a small struct
or value for {"topic": topic} and json.Marshal it) before assigning to
Event.Data, ensuring Event, EventTypeConnected and the client.Send call use the
marshaled bytes or string instead of fmt.Sprintf interpolation.

In `@ui/src/features/dag-runs/components/dag-run-details/DAGRunDetailsPanel.tsx`:
- Around line 115-140: The handler handleKeyDown is hijacking browser search
because pressing Ctrl/Cmd+F still triggers handleFullscreenClick; update
handleKeyDown (or specifically the 'f'/'F' branch) to ignore the key when
event.ctrlKey or event.metaKey (or other modifiers like altKey/shiftKey if
desired) are pressed, i.e., only trigger handleFullscreenClick when no modifier
keys are active and shouldIgnoreKeyboardShortcuts() is false.

In `@ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx`:
- Around line 74-82: The current effect sets notFound for any error; change it
to only setNotFound(true) when the error represents a 404 AND there is no
existing data (i.e., when error?.status or error?.response?.status === 404
depending on your useQuery error shape), otherwise do not flip notFound and keep
lastValidData as-is (so transient/network errors won’t stop polling). Update the
useEffect that references error, data, setNotFound, and setLastValidData to
perform a 404-specific check before setting notFound and only clear notFound
(setNotFound(false) and setLastValidData(data as DAGDetailsData) ) when valid
data is present.
- Around line 84-91: When fileName changes the component resets notFound and
activeTab but does not clear the cached DAG, so lastValidData can briefly show
the previous DAG; update the existing useEffect that depends on fileName to also
clear the cached data and current run by calling setLastValidData(null) (or
undefined per codebase convention) and setCurrentRun(null/undefined) alongside
setNotFound(false) and setActiveTab('status') so the panel doesn't display stale
DAG details while a new DAG loads.
- Around line 52-72: SSE is not including the selected remote node so sseResult
targets local while polling uses remoteNode; update the DAGDetailsPanel call to
pass remoteNode into useDAGSSE and modify useDAGSSE to append remoteNode as a
query param (or otherwise route the SSE URL) so SSE and polling use the same
remoteNode; ensure the signature change to useDAGSSE accepts (fileName: string,
enabled: boolean, remoteNode?: string) (or similar) and that sseResult.data
remains compatible with existing code path that falls back to pollingData.

In `@ui/src/features/dags/components/dag-execution/StepLog.tsx`:
- Around line 247-274: The guard in handleJumpToLine wrongly returns when
cachedData?.totalLines is undefined; compute a safe total (e.g., const
totalLines = cachedData?.totalLines ?? lineCount ?? 0) and use that in the
validation instead of cachedData?.totalLines so jumpToLine, pageSize,
setCurrentPage and setViewMode still work when totalLines is missing; keep the
rest of the logic (calculating targetPage, scrolling to data-line-number, and
the highlight timeout) unchanged.
- Around line 97-105: The SSE hook call is missing the selected remote node,
causing streams to target the wrong instance; update the call to useStepLogSSE
to pass remoteNode (e.g., useStepLogSSE(dagName, dagRunId, stepName,
shouldUseSSE, remoteNode)) and then modify the useStepLogSSE implementation to
include remoteNode in the SSE endpoint construction and any reconnection/polling
logic so sseResult, sseIsActive and fallback behavior respect the selected
remoteNode.

In `@ui/src/pages/dag-runs/index.tsx`:
- Around line 414-425: SSE parameter object sseParams currently omits the remote
node, causing wrong data when a remote node is selected; update sseParams to
include remoteNode (e.g., remoteNode: apiRemoteNode || undefined) and then
update useDAGRunsListSSE and the backend endpoint builder to accept and append
remoteNode to the SSE request, or if adding the param is not supported, change
the logic around useDAGRunsListSSE/shouldUsePolling to disable SSE whenever a
remote node is selected (use apiRemoteNode to gate SSE). Ensure the same
remoteNode key is used consistently across sseParams, useDAGRunsListSSE, and the
endpoint builder.

In `@ui/src/pages/dags/index.tsx`:
- Around line 195-209: The queryParams passed to useDAGsListSSE omit remoteNode,
causing cross-node data; update the queryParams object used by useDAGsListSSE to
include remoteNode (e.g., remoteNode: remoteNode || undefined) and add
remoteNode to the React.useMemo dependency array so it regenerates correctly,
and also update the SSE endpoint builder to accept and propagate remoteNode;
alternatively, if the SSE endpoint cannot support remoteNode, modify the logic
around sseResult/usePolling so SSE is disabled for remote selections (e.g., set
usePolling = true when remoteNode is set) to ensure DAG API calls always include
the remoteNode context.
🧹 Nitpick comments (10)
internal/service/frontend/sse/types_test.go (1)

3-20: Prefer require for test assertions.

Using require aligns with project testing guidelines and fails fast on header mismatches. As per coding guidelines, prefer require in Go tests.

♻️ Proposed change
-import (
-	"net/http/httptest"
-	"testing"
-
-	"github.com/stretchr/testify/assert"
-)
+import (
+	"net/http/httptest"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
@@
-	assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type"))
-	assert.Equal(t, "no-cache", w.Header().Get("Cache-Control"))
-	assert.Equal(t, "keep-alive", w.Header().Get("Connection"))
-	assert.Equal(t, "no", w.Header().Get("X-Accel-Buffering"))
+	require.Equal(t, "text/event-stream", w.Header().Get("Content-Type"))
+	require.Equal(t, "no-cache", w.Header().Get("Cache-Control"))
+	require.Equal(t, "keep-alive", w.Header().Get("Connection"))
+	require.Equal(t, "no", w.Header().Get("X-Accel-Buffering"))
ui/src/pages/index.tsx (1)

22-23: Use a dedicated remoteNode constant for query params.

The query currently inlines appBarContext.selectedRemoteNode || 'local'; define it once (and reuse for remoteKey) to keep the pattern consistent and avoid drift.

♻️ Suggested tweak
-  const remoteKey = appBarContext.selectedRemoteNode || 'local';
+  const remoteNode = appBarContext.selectedRemoteNode || 'local';
+  const remoteKey = remoteNode;
...
-          remoteNode: appBarContext.selectedRemoteNode || 'local',
+          remoteNode,
As per coding guidelines, keep the `remoteNode` access pattern consistent.

Also applies to: 175-206

ui/src/features/dag-runs/components/dag-run-details/DAGRunDetailsPanel.tsx (1)

100-111: Encode path segments in the fullscreen URL.

name or dagRunId may contain special characters; encoding avoids broken routes.

♻️ Suggested tweak
-      const url = `/dag-runs/${name}/${dagRunId}`;
+      const url = `/dag-runs/${encodeURIComponent(name)}/${encodeURIComponent(dagRunId)}`;
ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (1)

66-77: Encode fileName/activeTab in the fullscreen URL.

Protects against special characters breaking routes.

♻️ Suggested tweak
-    const url =
-      activeTab === 'status'
-        ? `/dags/${fileName}`
-        : `/dags/${fileName}/${activeTab}`;
+    const encodedFile = encodeURIComponent(fileName);
+    const encodedTab = encodeURIComponent(activeTab);
+    const url =
+      activeTab === 'status'
+        ? `/dags/${encodedFile}`
+        : `/dags/${encodedFile}/${encodedTab}`;
internal/service/frontend/sse/watcher_test.go (1)

17-70: Consider centralizing SSE test helpers in internal/test.

These helpers are used across multiple SSE test files (watcher_test.go, integration_test.go, handler_test.go, hub_test.go). Moving mockFetchFunc, countingFetchFunc, changingFetchFunc, and newTestClient to a shared fixture file (e.g., internal/test/sse.go) will improve discoverability and align with the pattern of centralizing shared test utilities.

internal/service/frontend/sse/watcher.go (1)

221-240: Consider removing clients on send failure to avoid retaining closed connections.

If Send fails (buffer full/closed), the client is closed but left in the map; over time this can accumulate. Removing here avoids repeated work if a slow client is dropped.

♻️ Suggested cleanup
 	for _, client := range clients {
 		if !client.Send(event) {
 			client.Close()
+			w.RemoveClient(client)
 			continue
 		}
 		if w.metrics != nil {
 			w.metrics.MessageSent(event.Type)
internal/service/frontend/sse/client.go (1)

41-59: Defer c.Close() in WritePump to ensure cleanup on all exit paths.

WritePump can exit via context cancellation, errors during write, or client closure, but currently never closes the client if an error occurs. Without explicit Close() in the handler or WritePump itself, the done channel never closes and queued events may block senders. The defer c.Close() suggestion is valid—Close() is idempotent and safely handles multiple calls.

♻️ Suggested change
 func (c *Client) WritePump(ctx context.Context) {
+	defer c.Close()
 	for {
 		select {
 		case <-ctx.Done():
 			return
internal/service/frontend/sse/handler.go (1)

99-120: Truncation may produce invalid query strings.

Truncating the encoded query at maxQueryLength could cut mid-key or mid-value, potentially creating malformed topic identifiers. While this is unlikely with 4096 chars, consider truncating at a safe boundary or rejecting excessively long queries instead.

💡 Suggested approach
 	result := values.Encode()
 	if len(result) > maxQueryLength {
-		return result[:maxQueryLength]
+		// Reject excessively long queries rather than truncating unsafely
+		return ""
 	}
 	return result

Alternatively, truncate at the last complete & boundary before the limit.

internal/service/frontend/api/v2/dagruns.go (1)

2180-2188: Consider consistent error handling for stderr.

Stderr read errors are silently ignored while stdout errors are propagated. If this is intentional (stderr is less critical), consider adding a brief comment explaining the rationale. Otherwise, consider logging the error for debugging purposes.

internal/service/frontend/sse/hub.go (1)

203-216: Consider batching failed client cleanup.

When Send fails, Unsubscribe is called immediately while iterating, acquiring the write lock for each failed client. If many clients fail simultaneously (e.g., network partition), this could cause lock contention. Consider collecting failed clients and unsubscribing them in a batch after the loop.

💡 Suggested improvement
 func (h *Hub) sendHeartbeats() {
 	clients := h.collectClients()
 	heartbeat := &Event{Type: EventTypeHeartbeat, Data: "{}"}
+	var failed []*Client

 	for _, client := range clients {
 		if !client.Send(heartbeat) {
 			client.Close()
-			h.Unsubscribe(client)
+			failed = append(failed, client)
 		} else if h.metrics != nil {
 			h.metrics.MessageSent(EventTypeHeartbeat)
 		}
 	}
+
+	for _, client := range failed {
+		h.Unsubscribe(client)
+	}
 }

Comment thread internal/service/frontend/api/v2/dags.go
Comment thread internal/service/frontend/api/v2/queues.go
Comment thread internal/service/frontend/sse/handler_test.go
Comment thread internal/service/frontend/sse/handler.go
Comment thread ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx
Comment thread ui/src/features/dags/components/dag-execution/StepLog.tsx
Comment thread ui/src/features/dags/components/dag-execution/StepLog.tsx
Comment thread ui/src/pages/dag-runs/index.tsx
Comment thread ui/src/pages/dags/index.tsx
@yohamta0 yohamta0 merged commit 9e4d54a into main Jan 25, 2026
4 of 5 checks passed
@yohamta0 yohamta0 deleted the sse-handler branch January 25, 2026 16:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant