refactor: polling to SSE for realtime update#1608
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThe PR introduces a comprehensive Server-Sent Events (SSE) infrastructure for real-time data streaming. Backend changes include new SSE modules (client, hub, handler, watcher, metrics, proxy), public API methods for SSE data retrieval, and server lifecycle integration. Frontend changes add SSE consumption hooks and refactor UI components to use SSE with polling fallback for real-time updates across DAG runs, DAG details, logs, and queue data. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Browser
participant Handler
participant Hub
participant Watcher
participant FetchFunc
participant DataStore
Client->>Browser: Establish SSE Connection
Browser->>Handler: GET /events/dag-runs/{id}
Handler->>Hub: Subscribe(client, topic)
Hub->>Watcher: GetOrCreate(topic)
Watcher->>FetchFunc: Start polling in background
loop Polling Interval
Watcher->>FetchFunc: Fetch data for topic
FetchFunc->>DataStore: Query current state
DataStore-->>FetchFunc: Return data
Watcher->>Watcher: Hash data, detect change
alt Data Changed
Watcher->>Hub: Broadcast(Event: Data)
Hub->>Client: Send SSE event
Client->>Browser: Update UI
else No Change
Watcher->>Watcher: Skip broadcast
end
end
Browser->>Handler: Close connection
Handler->>Hub: Unsubscribe(client)
Hub->>Watcher: Remove client
alt Last Client
Hub->>Watcher: Stop()
Watcher->>Watcher: Cancel polling
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Fix all issues with AI agents
In `@internal/service/frontend/api/v2/sse_data.go`:
- Around line 96-171: GetDAGRunLogsData currently hardcodes
fileutil.LogReadOptions{Tail: 500} which causes SSE payloads to always return
500 lines regardless of the UI pageSize; modify GetDAGRunLogsData to accept a
tail parameter from the SSE identifier/query (e.g., parse an optional tail value
from identifier or read from the SSE request query) and use that value when
constructing fileutil.LogReadOptions.Tail (fallback to existing default like 500
or ui default 1000 if missing), validate/clamp the tail to reasonable bounds,
and ensure the same identifier parsing logic around strings.Cut(ref :=
exec.NewDAGRunRef(...)) continues to work so SSE and REST return consistent line
counts.
In `@internal/service/frontend/sse/proxy.go`:
- Around line 32-38: The TLS config for the HTTP client (the client variable
using http.Transport with TLSClientConfig and node.SkipTLSVerify) must set a
secure minimum TLS version and the http.Client should have a request timeout to
avoid goroutine leaks; update the tls.Config to include MinVersion (e.g.,
tls.VersionTLS12 or higher) and set a sensible Timeout on the http.Client, and
ensure the "time" package is imported for the timeout value.
In `@internal/service/frontend/sse/watcher.go`:
- Around line 117-129: The broadcast method on Watcher calls
w.metrics.MessageSent(...) without checking for a nil w.metrics, which can
panic; update Watcher.broadcast to guard the metrics call (e.g., if w.metrics !=
nil) before invoking MessageSent, mirroring the nil-check pattern used in poll
so that metrics are only recorded when w.metrics is non-nil while preserving the
existing Send/Close logic for clients.
- Around line 83-98: The poll method in Watcher calls w.metrics.FetchError(...)
without checking if w.metrics is nil, which will panic; add a nil check before
invoking FetchError (e.g., if w.metrics != nil {
w.metrics.FetchError(string(w.topicType)) }) so metrics calls are only made when
w.metrics is non-nil; keep the existing backoff, broadcast(&Event{Type:
EventTypeError, Data: err.Error()}) and errorBackoff.Next logic intact.
- Around line 53-70: The Watcher currently never emits lifecycle metrics; call
the Metrics methods when the watcher starts and stops: invoke
w.metrics.WatcherStarted() at the beginning of Watcher.Start (after creating the
ticker / after initial poll) and ensure w.metrics.WatcherStopped() is invoked
whenever Start returns (on ctx.Done(), w.stopCh or any early exit), and also add
the stop metric call inside Watcher.Stop (or wherever w.stopCh is closed) so
both Watcher.Start and Watcher.Stop consistently emit
WatcherStarted()/WatcherStopped() using the existing w.metrics field.
In `@ui/src/features/dags/components/dag-execution/ExecutionLog.tsx`:
- Around line 68-69: isLiveMode is only initialized from isRunning and never
updated, so when dagRun.status stops being Status.Running the UI can hide the
live toggle while polling/SSE remain active; add a useEffect in ExecutionLog
that watches isRunning (derived from dagRun?.status === Status.Running) and
calls setIsLiveMode(isRunning) or sets false when isRunning becomes false to
keep isLiveMode in sync with dagRun state (use the existing isLiveMode,
setIsLiveMode, isRunning, and Status.Running symbols).
- Around line 238-247: The highlight uses the removed Tailwind v4
`bg-opacity-20`; in the scroll-to-line highlight block inside ExecutionLog.tsx
(the loop that finds elements by '[data-line-number]' and calls scrollIntoView),
replace the two-class approach htmlElement.classList.add('bg-primary/100',
'bg-opacity-20') and the corresponding classList.remove with a single
slash-opacity color class (e.g., 'bg-primary/100/20' or if your palette uses
hyphen shades 'bg-primary-100/20' or simply 'bg-primary/20'); remove any use of
'bg-opacity-20' so add/remove use the new single class name.
In `@ui/src/hooks/useDAGRunsListSSE.ts`:
- Around line 10-17: The DAGRunsListParams.interface currently types status as
string but the API uses a numeric Status enum; change the status property to
number (status?: number) in the DAGRunsListParams definition and update any call
sites in useDAGRunsListSSE (or functions that build the request/polling params)
to stop treating it as a string—remove parseInt() conversions and ensure values
passed to the polling endpoint are numeric so types and runtime behavior align
with the API.
In `@ui/src/hooks/useSSE.ts`:
- Around line 70-74: The SSE listener registered via
eventSource.addEventListener('data', ...) currently calls
JSON.parse(messageEvent.data) without protection; wrap the parse in a try-catch
inside that listener so malformed JSON doesn't throw and break the SSE flow, log
or handle the parse error (e.g., processLogger/warn or set an error state) and
avoid calling setState with invalid parsed data; locate the listener in useSSE
(the handler that casts event as MessageEvent and assigns to parsed) and enclose
JSON.parse(...) in a try block with a catch that handles/logs the error and
returns early.
- Around line 93-98: In useSSE's retry logic, increment retryCountRef.current
before scheduling the setTimeout so the retry attempt uses the updated count
(preventing an extra attempt); specifically, inside the block that checks
retryCountRef.current < MAX_RETRIES, move the retryCountRef.current++ to execute
immediately (before calling setTimeout stored in retryTimeoutRef) and then
compute the delay via calculateRetryDelay(retryCountRef.current) and call
connect() from the timeout—this ensures retries 1..MAX_RETRIES occur instead of
0..MAX_RETRIES-1.
🧹 Nitpick comments (10)
ui/src/hooks/useDAGRunSSE.ts (1)
10-17: Consider simplifying the return type annotation for consistency.Other SSE hooks in this PR use
SSEState<T>directly as the return type. UsingReturnType<typeof useSSE<DAGRunSSEResponse>>achieves the same result but is more verbose and inconsistent with sibling hooks.♻️ Suggested simplification
export function useDAGRunSSE( name: string, dagRunId: string, enabled: boolean = true -): ReturnType<typeof useSSE<DAGRunSSEResponse>> { +): SSEState<DAGRunSSEResponse> { const endpoint = `/events/dag-runs/${encodeURIComponent(name)}/${encodeURIComponent(dagRunId)}`; return useSSE<DAGRunSSEResponse>(endpoint, enabled); }internal/service/frontend/sse/proxy.go (1)
139-154: Consider logging errors for debugging proxy issues.The
streamResponsefunction silently returns on errors without logging. While this prevents error spam, it can make debugging proxy connectivity issues difficult in production.♻️ Optional: Add debug logging
+import "log/slog" // streamResponse copies data from the response body to the client. func streamResponse(w http.ResponseWriter, flusher http.Flusher, body io.Reader) { buf := make([]byte, 4096) for { n, readErr := body.Read(buf) if n > 0 { if _, writeErr := w.Write(buf[:n]); writeErr != nil { + slog.Debug("SSE proxy write error", "error", writeErr) return } flusher.Flush() } if readErr != nil { + if readErr != io.EOF { + slog.Debug("SSE proxy read error", "error", readErr) + } return } } }ui/src/hooks/useDAGsListSSE.ts (1)
7-11: Consider makingDAGsListSSEResponseinterface exported directly.The interface is currently declared without
exportand only re-exported at the end of the file (line 39). For consistency withStepLogSSEResponseinuseStepLogSSE.tsandDAGRunLogsSSEResponseinuseDAGRunLogsSSE.ts, consider exporting the interface directly at declaration.Suggested change
-interface DAGsListSSEResponse { +export interface DAGsListSSEResponse { dags: DAGFile[]; errors: string[]; pagination: Pagination; }ui/src/hooks/useSSE.ts (1)
109-117: Consider resetting state when dependencies change to avoid stale data.When
endpointorremoteNodechanges, the effect reconnects but doesn't reset the state. This could leave staledatafrom a previous connection visible while connecting to a new endpoint.Proposed enhancement
useEffect(() => { + // Reset state when connection parameters change + setState(INITIAL_STATE as SSEState<T>); + retryCountRef.current = 0; connect(); return () => { if (retryTimeoutRef.current) { clearTimeout(retryTimeoutRef.current); } eventSourceRef.current?.close(); }; }, [connect]);ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx (2)
45-46: HardcodeddagRunIdandstepNamevalues may limit component reusability.These are hardcoded as
'latest'andnullbut passed toDAGDetailsContent. If the component should support viewing specific DAG runs, consider accepting these as props or deriving them from route/state.
149-166:formatDurationhas no dependencies but is wrapped in useCallback.Since
formatDurationdoesn't depend on any reactive values, it could be moved outside the component or declared as a plain function inside. The empty dependency array works but theuseCallbackwrapper adds unnecessary overhead.Move outside component
+function formatDuration(startDate: string, endDate: string): string { + if (!startDate || !endDate) { + return '--'; + } + const duration = dayjs.duration(dayjs(endDate).diff(dayjs(startDate))); + const hours = Math.floor(duration.asHours()); + const minutes = duration.minutes(); + const seconds = duration.seconds(); + + if (hours > 0) { + return `${hours}h ${minutes}m ${seconds}s`; + } + if (minutes > 0) { + return `${minutes}m ${seconds}s`; + } + return `${seconds}s`; +} + function DAGDetailsPanel({ fileName, onClose, onNavigate }: Props): React.ReactElement | null { // ... rest of component - const formatDuration = useCallback((startDate: string, endDate: string): string => { - // ... - }, []);ui/src/features/dags/components/dag-execution/StepLog.tsx (2)
266-269: Accessibility: Use accessible class toggling instead of direct class manipulation.Directly adding/removing classes with
classListfor visual feedback may not work well with Tailwind's purge mechanism and could cause issues. Consider using React state for the highlight effect.Alternative using React state
const [highlightedLine, setHighlightedLine] = useState<number | null>(null); // In handleJumpToLine: setHighlightedLine(lineNum); setTimeout(() => setHighlightedLine(null), 2000); // In render: <span className={`... ${highlightedLine === getLineNumber(index) ? 'bg-primary/20' : ''}`} data-line-number={getLineNumber(index)} >
187-211: Effect has potential stale closure issue withcachedData.The effect checks
cachedDatain its logic but also depends on it, which could cause unnecessary re-runs. Consider using a ref for theisInitialLoadcheck instead of includingcachedDatain the dependency array condition.ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (1)
70-115: Consider memoizinghandleFullscreenClickwithuseCallback.Unlike
DAGRunDetailsPanel.tsxwhich wraps this function inuseCallback, here it's recreated every render. This causes theuseEffectto unnecessarily re-run and re-attach event listeners.♻️ Suggested improvement
- const handleFullscreenClick = (e?: React.MouseEvent) => { + const handleFullscreenClick = React.useCallback((e?: React.MouseEvent) => { const url = activeTab === 'status' ? `/dags/${fileName}` : `/dags/${fileName}/${activeTab}`; if (e?.metaKey || e?.ctrlKey) { window.open(url, '_blank'); } else { navigate(url); } - }; + }, [activeTab, fileName, navigate]);internal/service/frontend/sse/handler.go (1)
107-118: MoveSetSSEHeadersafter client creation to ensure proper error responses.Currently,
SetSSEHeadersis called beforeNewClient, which setsContent-Type: text/event-stream. If client creation or subscription fails, the subsequenthttp.Errorcall will write an error message with the SSE content type, which may confuse clients.♻️ Suggested fix
- SetSSEHeaders(w) - client, err := NewClient(w) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } + SetSSEHeaders(w) + if err := h.hub.Subscribe(client, topic); err != nil {
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #1608 +/- ##
==========================================
+ Coverage 68.88% 68.94% +0.06%
==========================================
Files 291 291
Lines 33052 33120 +68
==========================================
+ Hits 22767 22835 +68
- Misses 8316 8319 +3
+ Partials 1969 1966 -3
... and 13 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
|
@coderabbitai review again |
|
✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 12
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
ui/src/pages/queues/index.tsx (1)
102-109: Use aremoteNodeconstant foruseQuery.
Guidelines require theconst remoteNode = appBarContext.selectedRemoteNode || 'local';pattern when wiringuseQueryparams.♻️ Proposed adjustment
- const remoteKey = appBarContext.selectedRemoteNode || 'local'; + const remoteNode = appBarContext.selectedRemoteNode || 'local'; + const remoteKey = remoteNode; ... - remoteNode: appBarContext.selectedRemoteNode || 'local', + remoteNode,As per coding guidelines, use the
remoteNodeconstant pattern foruseQueryhooks.ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (2)
85-110: Avoid handling Ctrl/Cmd+F as fullscreen.Right now
Ftriggers fullscreen even when the user presses Ctrl/Cmd+F to search.♻️ Suggested tweak
- case 'f': - case 'F': - handleFullscreenClick(); - break; + case 'f': + case 'F': + if (event.metaKey || event.ctrlKey) return; + handleFullscreenClick(); + break;
127-135: Loading gate may block DAGs with no runs.If
latestDAGRuncan be null for a brand-new DAG, the modal will stay in a loading state even thoughdata.dagis present. Consider gating ondata?.daginstead.🐛 Suggested fix
- const hasData = data && data.latestDAGRun; + const hasData = !!data?.dag;
🤖 Fix all issues with AI agents
In `@internal/service/frontend/api/v2/dags.go`:
- Around line 350-391: getDAGDetailsData currently treats any error from
a.dagRunMgr.GetLatestStatus as fatal which hides valid DAGs with no runs; update
the GetLatestStatus error handling in getDAGDetailsData so that if the error is
exec.ErrNoStatusData (or the equivalent sentinel from the exec package) you
treat there being no latest run (e.g., leave dagStatus nil/zero and set
LatestDAGRun to an empty/nil value via ToDAGRunDetails), but for other errors
return the formatted error as before; adjust the call site around
a.dagRunMgr.GetLatestStatus and ToDAGRunDetails to handle the absent-status case
without failing the whole response.
In `@internal/service/frontend/api/v2/queues.go`:
- Around line 299-353: GetQueueItemsData duplicates DAG run summary fetching and
leaves running/queued as nil slices; refactor to call the existing helper
fetchDAGRunSummary for both the running loop (currently iterating runningRefs
from procStore.ListAllAlive) and the queued loop (iterating queuedItems from
queueStore.List) to remove duplicated logic, and initialize running and queued
with make([]api.DAGRunSummary, 0) so QueueItemsResponse always marshals to empty
arrays instead of null.
In `@internal/service/frontend/sse/handler_test.go`:
- Around line 329-366: The test TestHandleSSEMaxClients uses a fixed time.Sleep
to wait for the first client to connect which can cause flakiness; replace the
sleep with a deterministic wait that polls hub.ClientCount() (or a small loop
with timeout) after starting the goroutine for handler.HandleDAGEvents so the
test proceeds only once NewHub(WithMaxClients(1)) reports the first client is
registered, then run the second request and assert StatusServiceUnavailable and
"max clients reached"; ensure you reference TestHandleSSEMaxClients,
NewHub/WithMaxClients, hub.ClientCount(), and Handler.HandleDAGEvents when
implementing the polling wait.
In `@internal/service/frontend/sse/handler.go`:
- Around line 151-154: The code sends an Event with Data built by fmt.Sprintf
containing the raw topic, which allows JSON injection if topic contains
quotes/backslashes; update the construction of the Data field in the client.Send
call inside the handler so the topic is JSON-encoded (e.g., build a small struct
or value for {"topic": topic} and json.Marshal it) before assigning to
Event.Data, ensuring Event, EventTypeConnected and the client.Send call use the
marshaled bytes or string instead of fmt.Sprintf interpolation.
In `@ui/src/features/dag-runs/components/dag-run-details/DAGRunDetailsPanel.tsx`:
- Around line 115-140: The handler handleKeyDown is hijacking browser search
because pressing Ctrl/Cmd+F still triggers handleFullscreenClick; update
handleKeyDown (or specifically the 'f'/'F' branch) to ignore the key when
event.ctrlKey or event.metaKey (or other modifiers like altKey/shiftKey if
desired) are pressed, i.e., only trigger handleFullscreenClick when no modifier
keys are active and shouldIgnoreKeyboardShortcuts() is false.
In `@ui/src/features/dags/components/dag-details/DAGDetailsPanel.tsx`:
- Around line 74-82: The current effect sets notFound for any error; change it
to only setNotFound(true) when the error represents a 404 AND there is no
existing data (i.e., when error?.status or error?.response?.status === 404
depending on your useQuery error shape), otherwise do not flip notFound and keep
lastValidData as-is (so transient/network errors won’t stop polling). Update the
useEffect that references error, data, setNotFound, and setLastValidData to
perform a 404-specific check before setting notFound and only clear notFound
(setNotFound(false) and setLastValidData(data as DAGDetailsData) ) when valid
data is present.
- Around line 84-91: When fileName changes the component resets notFound and
activeTab but does not clear the cached DAG, so lastValidData can briefly show
the previous DAG; update the existing useEffect that depends on fileName to also
clear the cached data and current run by calling setLastValidData(null) (or
undefined per codebase convention) and setCurrentRun(null/undefined) alongside
setNotFound(false) and setActiveTab('status') so the panel doesn't display stale
DAG details while a new DAG loads.
- Around line 52-72: SSE is not including the selected remote node so sseResult
targets local while polling uses remoteNode; update the DAGDetailsPanel call to
pass remoteNode into useDAGSSE and modify useDAGSSE to append remoteNode as a
query param (or otherwise route the SSE URL) so SSE and polling use the same
remoteNode; ensure the signature change to useDAGSSE accepts (fileName: string,
enabled: boolean, remoteNode?: string) (or similar) and that sseResult.data
remains compatible with existing code path that falls back to pollingData.
In `@ui/src/features/dags/components/dag-execution/StepLog.tsx`:
- Around line 247-274: The guard in handleJumpToLine wrongly returns when
cachedData?.totalLines is undefined; compute a safe total (e.g., const
totalLines = cachedData?.totalLines ?? lineCount ?? 0) and use that in the
validation instead of cachedData?.totalLines so jumpToLine, pageSize,
setCurrentPage and setViewMode still work when totalLines is missing; keep the
rest of the logic (calculating targetPage, scrolling to data-line-number, and
the highlight timeout) unchanged.
- Around line 97-105: The SSE hook call is missing the selected remote node,
causing streams to target the wrong instance; update the call to useStepLogSSE
to pass remoteNode (e.g., useStepLogSSE(dagName, dagRunId, stepName,
shouldUseSSE, remoteNode)) and then modify the useStepLogSSE implementation to
include remoteNode in the SSE endpoint construction and any reconnection/polling
logic so sseResult, sseIsActive and fallback behavior respect the selected
remoteNode.
In `@ui/src/pages/dag-runs/index.tsx`:
- Around line 414-425: SSE parameter object sseParams currently omits the remote
node, causing wrong data when a remote node is selected; update sseParams to
include remoteNode (e.g., remoteNode: apiRemoteNode || undefined) and then
update useDAGRunsListSSE and the backend endpoint builder to accept and append
remoteNode to the SSE request, or if adding the param is not supported, change
the logic around useDAGRunsListSSE/shouldUsePolling to disable SSE whenever a
remote node is selected (use apiRemoteNode to gate SSE). Ensure the same
remoteNode key is used consistently across sseParams, useDAGRunsListSSE, and the
endpoint builder.
In `@ui/src/pages/dags/index.tsx`:
- Around line 195-209: The queryParams passed to useDAGsListSSE omit remoteNode,
causing cross-node data; update the queryParams object used by useDAGsListSSE to
include remoteNode (e.g., remoteNode: remoteNode || undefined) and add
remoteNode to the React.useMemo dependency array so it regenerates correctly,
and also update the SSE endpoint builder to accept and propagate remoteNode;
alternatively, if the SSE endpoint cannot support remoteNode, modify the logic
around sseResult/usePolling so SSE is disabled for remote selections (e.g., set
usePolling = true when remoteNode is set) to ensure DAG API calls always include
the remoteNode context.
🧹 Nitpick comments (10)
internal/service/frontend/sse/types_test.go (1)
3-20: Preferrequirefor test assertions.Using
requirealigns with project testing guidelines and fails fast on header mismatches. As per coding guidelines, preferrequirein Go tests.♻️ Proposed change
-import ( - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" -) +import ( + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) @@ - assert.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) - assert.Equal(t, "no-cache", w.Header().Get("Cache-Control")) - assert.Equal(t, "keep-alive", w.Header().Get("Connection")) - assert.Equal(t, "no", w.Header().Get("X-Accel-Buffering")) + require.Equal(t, "text/event-stream", w.Header().Get("Content-Type")) + require.Equal(t, "no-cache", w.Header().Get("Cache-Control")) + require.Equal(t, "keep-alive", w.Header().Get("Connection")) + require.Equal(t, "no", w.Header().Get("X-Accel-Buffering"))ui/src/pages/index.tsx (1)
22-23: Use a dedicatedremoteNodeconstant for query params.The query currently inlines
appBarContext.selectedRemoteNode || 'local'; define it once (and reuse forremoteKey) to keep the pattern consistent and avoid drift.As per coding guidelines, keep the `remoteNode` access pattern consistent.♻️ Suggested tweak
- const remoteKey = appBarContext.selectedRemoteNode || 'local'; + const remoteNode = appBarContext.selectedRemoteNode || 'local'; + const remoteKey = remoteNode; ... - remoteNode: appBarContext.selectedRemoteNode || 'local', + remoteNode,Also applies to: 175-206
ui/src/features/dag-runs/components/dag-run-details/DAGRunDetailsPanel.tsx (1)
100-111: Encode path segments in the fullscreen URL.
nameordagRunIdmay contain special characters; encoding avoids broken routes.♻️ Suggested tweak
- const url = `/dag-runs/${name}/${dagRunId}`; + const url = `/dag-runs/${encodeURIComponent(name)}/${encodeURIComponent(dagRunId)}`;ui/src/features/dags/components/dag-details/DAGDetailsModal.tsx (1)
66-77: EncodefileName/activeTabin the fullscreen URL.Protects against special characters breaking routes.
♻️ Suggested tweak
- const url = - activeTab === 'status' - ? `/dags/${fileName}` - : `/dags/${fileName}/${activeTab}`; + const encodedFile = encodeURIComponent(fileName); + const encodedTab = encodeURIComponent(activeTab); + const url = + activeTab === 'status' + ? `/dags/${encodedFile}` + : `/dags/${encodedFile}/${encodedTab}`;internal/service/frontend/sse/watcher_test.go (1)
17-70: Consider centralizing SSE test helpers ininternal/test.These helpers are used across multiple SSE test files (
watcher_test.go,integration_test.go,handler_test.go,hub_test.go). MovingmockFetchFunc,countingFetchFunc,changingFetchFunc, andnewTestClientto a shared fixture file (e.g.,internal/test/sse.go) will improve discoverability and align with the pattern of centralizing shared test utilities.internal/service/frontend/sse/watcher.go (1)
221-240: Consider removing clients on send failure to avoid retaining closed connections.If
Sendfails (buffer full/closed), the client is closed but left in the map; over time this can accumulate. Removing here avoids repeated work if a slow client is dropped.♻️ Suggested cleanup
for _, client := range clients { if !client.Send(event) { client.Close() + w.RemoveClient(client) continue } if w.metrics != nil { w.metrics.MessageSent(event.Type)internal/service/frontend/sse/client.go (1)
41-59: Deferc.Close()in WritePump to ensure cleanup on all exit paths.WritePump can exit via context cancellation, errors during write, or client closure, but currently never closes the client if an error occurs. Without explicit Close() in the handler or WritePump itself, the
donechannel never closes and queued events may block senders. Thedefer c.Close()suggestion is valid—Close() is idempotent and safely handles multiple calls.♻️ Suggested change
func (c *Client) WritePump(ctx context.Context) { + defer c.Close() for { select { case <-ctx.Done(): returninternal/service/frontend/sse/handler.go (1)
99-120: Truncation may produce invalid query strings.Truncating the encoded query at
maxQueryLengthcould cut mid-key or mid-value, potentially creating malformed topic identifiers. While this is unlikely with 4096 chars, consider truncating at a safe boundary or rejecting excessively long queries instead.💡 Suggested approach
result := values.Encode() if len(result) > maxQueryLength { - return result[:maxQueryLength] + // Reject excessively long queries rather than truncating unsafely + return "" } return resultAlternatively, truncate at the last complete
&boundary before the limit.internal/service/frontend/api/v2/dagruns.go (1)
2180-2188: Consider consistent error handling for stderr.Stderr read errors are silently ignored while stdout errors are propagated. If this is intentional (stderr is less critical), consider adding a brief comment explaining the rationale. Otherwise, consider logging the error for debugging purposes.
internal/service/frontend/sse/hub.go (1)
203-216: Consider batching failed client cleanup.When
Sendfails,Unsubscribeis called immediately while iterating, acquiring the write lock for each failed client. If many clients fail simultaneously (e.g., network partition), this could cause lock contention. Consider collecting failed clients and unsubscribing them in a batch after the loop.💡 Suggested improvement
func (h *Hub) sendHeartbeats() { clients := h.collectClients() heartbeat := &Event{Type: EventTypeHeartbeat, Data: "{}"} + var failed []*Client for _, client := range clients { if !client.Send(heartbeat) { client.Close() - h.Unsubscribe(client) + failed = append(failed, client) } else if h.metrics != nil { h.metrics.MessageSent(EventTypeHeartbeat) } } + + for _, client := range failed { + h.Unsubscribe(client) + } }
Summary by CodeRabbit
Release Notes
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.