Stabilize multiplexed SSE connection management#1768
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces an SSE multiplexing system with topic-based subscriptions for managing multiple concurrent SSE streams per session. Backend changes add SSE configuration, a multiplexer component for session/topic lifecycle management, and new agent API methods for authorization and snapshots. Frontend changes refactor SSE subscription logic to use topics and add fallback polling when SSE is offline, plus delegate session polling support. Changes
Sequence DiagramssequenceDiagram
participant Client as Client App
participant SSEMgr as SSEManager
participant Conn as SSE Connection
participant Server as Multiplexer
participant Topic as Topic Handler
Client->>SSEMgr: subscribeTopic(sessionId)
SSEMgr->>SSEMgr: getOrCreateConnection(apiURL)
SSEMgr->>Conn: create/reuse
SSEMgr->>Server: POST /events/stream<br/>(topics=[agent:sessionId])
Server->>Server: createSession()<br/>register subscriber
Server->>Conn: send control event<br/>(sessionId, initial topics)
Conn->>SSEMgr: onData(controlEvent)
SSEMgr->>SSEMgr: updateState(connected)
SSEMgr->>Client: onSnapshot(sessionState)
rect rgba(100, 150, 200, 0.5)
Note over Client,Topic: Streaming Phase
Server->>Topic: poll payload
Topic->>Server: return data
Server->>Conn: send data frame
Conn->>SSEMgr: onData(dataEvent)
SSEMgr->>Client: onSnapshot(updated)
end
sequenceDiagram
participant Client as Client App
participant SSEMgr as SSEManager
participant Fallback as Polling Fallback
participant API as Agent API
Client->>SSEMgr: subscribeTopic()
SSEMgr->>SSEMgr: connection failed
SSEMgr->>Client: onStateChange(fallback:true)
rect rgba(200, 100, 100, 0.5)
Note over Client,API: Fallback Polling
Fallback->>Fallback: FALLBACK_POLL_INTERVAL_MS
Fallback->>API: GetSessionSnapshot()
API->>Fallback: StreamResponse
Fallback->>Client: onSnapshot(snapshot)
end
rect rgba(100, 200, 100, 0.5)
Note over Client,SSEMgr: SSE Reconnect
SSEMgr->>SSEMgr: connection restored
SSEMgr->>Client: onStateChange(live:true)
Fallback->>Fallback: halt polling
end
sequenceDiagram
participant Client as Client
participant Mux as Multiplexer
participant TopicReg as Topic Registry
participant Auth as Authorizer
Client->>Mux: mutateSession(sessionId,<br/>add=[topic1, topic2])
Mux->>Mux: parseTopicList()
Mux->>Mux: resolveTopicsForMutation()
loop for each topic
Mux->>Auth: authorize(topic)
alt authorized
Mux->>TopicReg: getOrCreateTopic()
else unauthorized
Mux->>Mux: record error
end
end
alt conflict detected
Mux->>Client: error(ErrConflictingMutation)
else success
Mux->>Mux: applyMutation(session)
Mux->>Client: mutationResult(added,removed,errors)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 13
🧹 Nitpick comments (1)
internal/cmn/config/config.go (1)
462-476: Validation allows zero values for all SSE fields.The validation only checks for negative values (
< 0). Zero values are accepted, which means:
MaxTopicsPerConnection = 0→ no topics per connectionMaxClients = 0→ no clients allowedHeartbeatInterval = 0→ no heartbeatsThis may be intentional (defaults are set in loader.go), but consider whether zero should be invalid for fields like
HeartbeatIntervalwhere disabling heartbeats could cause connection issues.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/config/config.go` around lines 462 - 476, Validation currently allows zero for all SSE fields; update the checks so HeartbeatInterval cannot be zero by changing the guard on c.Server.SSE.HeartbeatInterval to reject <= 0 and return a clear error (e.g., "sse.heartbeat_interval must be > 0"); decide whether to treat c.Server.SSE.MaxTopicsPerConnection and c.Server.SSE.MaxClients as > 0 as well (if so, change their checks from < 0 to <= 0 and adjust error messages). Ensure you reference the defaulting behavior in loader.go when making this change so you don't break intended defaults.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/agent/api.go`:
- Around line 855-869: The current StreamResponse assembly reads multiple
SessionManager accessors (GetMessages, GetSession, IsWorking, HasPendingPrompt,
GetModel, GetTotalCost, GetDelegates) separately which can observe torn state;
add a single locked snapshot helper on SessionManager (e.g., Snapshot() or
GetLockedState()) that acquires the manager's mutex and returns a struct
containing Messages, Session, Working, HasPendingPrompt, Model, TotalCost,
Delegates atomically, then replace the field-by-field reads in the handler
(where sessions.Load and the StreamResponse/SessionState are built) with a
single call to that snapshot and map its fields into
StreamResponse/SessionState.
- Around line 847-849: AuthorizeSessionAccess currently calls
getUserIDFromContext (which may fallback to "admin") and then GetSessionDetail
(which loads full messages/delegates); change it to explicitly resolve the
principal from ctx without the admin fallback (or only allow the fallback when
auth is globally disabled) and perform a lightweight metadata-only session
existence/ownership check instead of calling GetSessionDetail—use the session
metadata accessor (or add one) to verify session existence and owner matches the
resolved user ID before returning.
- Around line 877-886: The code masks storage errors by turning any
a.store.GetSession error into ErrSessionNotFound and by swallowing
a.store.GetMessages errors (setting messages = []Message{}); change both to
propagate the original store error to the caller instead: return the error from
GetSession directly (do not map all errors to ErrSessionNotFound unless sess ==
nil with nil error) and on GetMessages return the err instead of substituting an
empty slice (remove the a.logger-only swallow path). Update callers to handle
these storage errors/retries as needed; use the existing symbols
a.store.GetSession, a.store.GetMessages, ErrSessionNotFound, and messages to
locate and fix the logic.
In `@internal/service/frontend/sse/multiplex_handler.go`:
- Around line 66-70: The bootstrapTopics call is currently executed after
writeControl/Serve which makes the new subscription state visible before the
snapshot enqueue and allows a race where a concurrent unsubscribe removes topics
but the snapshot still enqueues stale events; to fix, serialize snapshot
bootstrap with the session-scoped mutation by moving the call to
session.bootstrapTopics into the same critical section where
result.session.writeControl (and any session subscription state mutations) occur
so the enqueue happens atomically with the subscription change, or alternatively
add a check in the snapshot-enqueue path to drop topics that are no longer
attached (i.e., make the session's snapshot enqueue/append reject topics absent
from the current subscription set in Session). Ensure you update both code paths
that call bootstrapTopics (the one using
result.session.writeControl/result.session.Serve and the other mirror path) and
reference the Session methods writeControl, bootstrapTopics, Serve and the
session snapshot enqueue/append logic when making the change.
- Around line 117-130: The proxy only forwards query params so clients sending
Last-Event-ID via header are treated as new streams; update the multiplex
handler code that constructs the proxied HTTP requests to copy the incoming
request's "Last-Event-ID" header into the outbound request's headers (e.g., when
you create the outgoing *http.Request* for the remote stream). Ensure this
header copy is done in both places noted (the place that builds the proxied
request around parseLastEventID and the second occurrence referenced in the
review), and keep using parseLastEventID as-is to read either query or header on
the local side.
In `@internal/service/frontend/sse/multiplex.go`:
- Around line 411-427: getOrCreateTopicForMutation can return a topic that's
being retired by unsubscribeTopic, causing stop() to close stopCh on a
still-in-use object; fix by making topic lifecycle changes atomic: add a
lifecycle flag on multiplexTopic (e.g., stopped/shuttingDown) and update
getOrCreateTopicForMutation to check that flag under m.mu and treat a stopped
topic as non-existent (create a new one instead); change unsubscribeTopic to
mark the topic as shuttingDown and remove it from m.topics while holding m.mu,
then release the lock and call topic.stop() (closing stopCh) once — this ensures
lookups never get a topic that is mid-retire and prevents closing stopCh on a
live topic (refer to getOrCreateTopicForMutation, unsubscribeTopic,
multiplexTopic.sessionCount, multiplexTopic.stop, stopCh, and m.topics).
- Around line 256-367: The mutation logic in applyMutation must be serialized
with session teardown to avoid races; add a session-scoped mutex (e.g.,
mutationMu on streamSession) and acquire it at the start of applyMutation and in
removeSession (and the other mutation path noted at 457-476) to serialize checks
and membership updates (currentSet/finalCount, calls to
addTopic/removeTopic/removeSession). Hold the lock while re-checking
session.isClosed and while performing unsubscribeTopic/addTopic/topic.addSession
and any adjustments to topic membership, using defer to unlock; keep
resolveTopicsForMutation outside the lock but ensure createdTopics cleanup and
the final membership changes happen under the session lock so no concurrent
mutation or teardown can violate maxTopicsPerConn or leave topics attached to
closed sessions.
In `@internal/service/frontend/sse/proxy.go`:
- Around line 117-118: The TopicTypeAgent branch currently concatenates the raw
identifier into the path (case TopicTypeAgent -> return "/agent/sessions/" +
identifier + "/stream"), which allows reserved characters to alter path/query;
change this to treat the identifier as a single path segment by applying
url.PathEscape(identifier) when building the string (and add the net/url
import), so identifiers like "a/b" or "sess-1?x=y" are percent-escaped rather
than injected into the path or query.
In `@internal/service/frontend/sse/topic_parse.go`:
- Around line 52-68: The current branches for
TopicTypeDAGRuns/TopicTypeQueues/TopicTypeDAGsList/TopicTypeDocTree and
TopicTypeDAGRunLogs treat sanitizeQueryForTopic(...) == "" as success but
sanitizeQueryForTopic also returns "" on parse failure, which widens
subscriptions; change sanitizeQueryForTopic to return (string, error) or add a
new parseAndSanitizeQuery(identifier string) (string, error) helper that
distinguishes an empty-but-valid query from a parse error, then update the
branches in topic_parse.go (the cases handling TopicTypeDAGRuns,
TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree and TopicTypeDAGRunLogs) to
call the new helper and return/propagate an error when parsing fails instead of
accepting "" as success; add a regression test that asserts malformed queries
like "%ZZ" cause the topic parsing to return an error rather than canonicalizing
to a broader subscription.
In `@internal/service/frontend/sse/types.go`:
- Around line 69-72: The Deprecation header in SetLegacyStreamDeprecationHeaders
is not RFC 9745 compliant (currently "true"); replace it with a Structured Field
Date value of the form "@<unix-seconds>" instead. In
SetLegacyStreamDeprecationHeaders, compute the deprecation instant as a Unix
timestamp (e.g., the deprecation moment or time.Now().Unix() / or derive from
legacyStreamSunset if that represents the instant), then set
w.Header().Set("Deprecation", fmt.Sprintf("@%d", ts)) while keeping the existing
Sunset header write for legacyStreamSunset.
In `@ui/src/features/agent/hooks/useAgentChat.ts`:
- Around line 260-275: The onPreConnect currently calls resetDelegates (passed
into useSSEConnection) which wipes delegates that
selectSession()/restoreDelegates just restored; remove the unconditional
resetDelegates from the onPreConnect arg and instead call resetDelegates only
from explicit paths that clear or start a brand-new session (e.g., the handler
that clears session or the new-session startup flow), or wrap onPreConnect with
a conditional that checks a "delegatesPreloaded" / "hasRestoredDelegates" flag
(set by restoreDelegates/selectSession) and only runs resetDelegates when that
flag is false; update useSSEConnection invocation to use the new conditional
wrapper or remove the call and place resetDelegates calls into the explicit
clear/start handlers.
- Around line 291-320: pollFallbackSnapshots can overlap because setInterval
fires regardless of in-flight work; add a single-run guard so only one poll runs
at a time and ensure timers are cancelled on cleanup. Implement either: (a) an
inFlight boolean (e.g., isPolling) checked at the top of pollFallbackSnapshots,
set true before doing fetchSessionDetail/fetchSessionDetail for delegateIds and
set false in a finally block so applySessionSnapshot/applyDelegateSnapshot are
only called from the active runner, or (b) replace setInterval(timer) with a
recursive setTimeout that schedules the next run only after the current run
completes; in both cases ensure you clear the timer and set active = false on
unmount/cleanup so no stray polls run.
In `@ui/src/hooks/SSEManager.ts`:
- Around line 216-225: The code replays the connection state to a new topic
immediately (topicEntry.subscribers.set(...);
callbacks.onStateChange(conn.state)) before the topic is moved from pendingAdd
into serverTopics, causing delegate panes to treat it as live; change this so
you emit topic-level state based on whether serverTopics.has(topic) or the
pendingAdd flag (i.e., report a pending/not-connected state instead of
conn.state for fresh topics) and defer flipping the topic to connected until
handleTopicAdded confirms the server subscription (or until the control/mutation
ack arrives); update the logic around topicEntry, callbacks.onStateChange,
handleTopicAdded and ensureConnected so handleTopicAdded performs the final
callbacks.onStateChange(connected) when the topic is actually moved into
serverTopics.
---
Nitpick comments:
In `@internal/cmn/config/config.go`:
- Around line 462-476: Validation currently allows zero for all SSE fields;
update the checks so HeartbeatInterval cannot be zero by changing the guard on
c.Server.SSE.HeartbeatInterval to reject <= 0 and return a clear error (e.g.,
"sse.heartbeat_interval must be > 0"); decide whether to treat
c.Server.SSE.MaxTopicsPerConnection and c.Server.SSE.MaxClients as > 0 as well
(if so, change their checks from < 0 to <= 0 and adjust error messages). Ensure
you reference the defaulting behavior in loader.go when making this change so
you don't break intended defaults.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 26ba3f8e-8057-40c7-bb7d-032073d9afac
📒 Files selected for processing (24)
internal/agent/api.gointernal/cmn/config/config.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/service/frontend/server.gointernal/service/frontend/sse/handler.gointernal/service/frontend/sse/metrics.gointernal/service/frontend/sse/multiplex.gointernal/service/frontend/sse/multiplex_handler.gointernal/service/frontend/sse/multiplex_test.gointernal/service/frontend/sse/proxy.gointernal/service/frontend/sse/proxy_test.gointernal/service/frontend/sse/topic_parse.gointernal/service/frontend/sse/types.goui/src/features/agent/hooks/__tests__/useAgentChat.test.tsxui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsxui/src/features/agent/hooks/useAgentChat.tsui/src/features/agent/hooks/useDelegateManager.tsui/src/features/agent/hooks/useSSEConnection.tsui/src/hooks/SSEManager.tsui/src/hooks/__tests__/SSEManager.test.tsui/src/hooks/useSSE.tsui/src/lib/__tests__/parseParams.test.ts
💤 Files with no reviewable changes (1)
- ui/src/lib/tests/parseParams.test.ts
| func (a *API) AuthorizeSessionAccess(ctx context.Context, sessionID string) error { | ||
| _, err := a.GetSessionDetail(ctx, sessionID, getUserIDFromContext(ctx)) | ||
| return err |
There was a problem hiding this comment.
Fail closed in AuthorizeSessionAccess.
Because this method is now an exported authorization boundary, inheriting the unconditional "admin" fallback from getUserIDFromContext is risky: a caller that passes a bare context.Context is authorized as admin instead of being rejected. It also goes through GetSessionDetail, which loads messages/delegates even though the check only needs ownership/existence. Please take the resolved principal explicitly here (or gate the fallback on auth-disabled mode) and query only session metadata.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/agent/api.go` around lines 847 - 849, AuthorizeSessionAccess
currently calls getUserIDFromContext (which may fallback to "admin") and then
GetSessionDetail (which loads full messages/delegates); change it to explicitly
resolve the principal from ctx without the admin fallback (or only allow the
fallback when auth is globally disabled) and perform a lightweight metadata-only
session existence/ownership check instead of calling GetSessionDetail—use the
session metadata accessor (or add one) to verify session existence and owner
matches the resolved user ID before returning.
| if mgrValue, ok := a.sessions.Load(sessionID); ok { | ||
| if mgr, ok := mgrValue.(*SessionManager); ok { | ||
| sess := mgr.GetSession() | ||
| return &StreamResponse{ | ||
| Messages: mgr.GetMessages(), | ||
| Session: &sess, | ||
| SessionState: &SessionState{ | ||
| SessionID: sessionID, | ||
| Working: mgr.IsWorking(), | ||
| HasPendingPrompt: mgr.HasPendingPrompt(), | ||
| Model: mgr.GetModel(), | ||
| TotalCost: mgr.GetTotalCost(), | ||
| }, | ||
| Delegates: mgr.GetDelegates(), | ||
| }, nil |
There was a problem hiding this comment.
Build the active-session snapshot atomically.
This response is assembled from several independent SessionManager reads (GetMessages, GetSession, IsWorking, HasPendingPrompt, GetModel, GetTotalCost, GetDelegates). Concurrent updates can interleave between those calls, so polling/proxy consumers may observe a torn snapshot. Please funnel this through a single locked snapshot helper instead of composing it field-by-field.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/agent/api.go` around lines 855 - 869, The current StreamResponse
assembly reads multiple SessionManager accessors (GetMessages, GetSession,
IsWorking, HasPendingPrompt, GetModel, GetTotalCost, GetDelegates) separately
which can observe torn state; add a single locked snapshot helper on
SessionManager (e.g., Snapshot() or GetLockedState()) that acquires the
manager's mutex and returns a struct containing Messages, Session, Working,
HasPendingPrompt, Model, TotalCost, Delegates atomically, then replace the
field-by-field reads in the handler (where sessions.Load and the
StreamResponse/SessionState are built) with a single call to that snapshot and
map its fields into StreamResponse/SessionState.
| sess, err := a.store.GetSession(ctx, sessionID) | ||
| if err != nil || sess == nil { | ||
| return nil, ErrSessionNotFound | ||
| } | ||
|
|
||
| messages, err := a.store.GetMessages(ctx, sessionID) | ||
| if err != nil { | ||
| a.logger.Error("Failed to get messages from store", "error", err) | ||
| messages = []Message{} | ||
| } |
There was a problem hiding this comment.
Propagate store failures instead of faking an empty or missing session.
On the store path, GetSession errors are collapsed into ErrSessionNotFound, and GetMessages errors still return a successful snapshot with []Message{}. For snapshot/polling consumers, that makes transient backend failures look like “session disappeared” or “history is empty”, which can clobber good client state. Return the storage error so the caller can retry.
Possible fix
sess, err := a.store.GetSession(ctx, sessionID)
- if err != nil || sess == nil {
- return nil, ErrSessionNotFound
- }
+ if err != nil {
+ return nil, err
+ }
+ if sess == nil {
+ return nil, ErrSessionNotFound
+ }
messages, err := a.store.GetMessages(ctx, sessionID)
if err != nil {
- a.logger.Error("Failed to get messages from store", "error", err)
- messages = []Message{}
+ return nil, err
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/agent/api.go` around lines 877 - 886, The code masks storage errors
by turning any a.store.GetSession error into ErrSessionNotFound and by
swallowing a.store.GetMessages errors (setting messages = []Message{}); change
both to propagate the original store error to the caller instead: return the
error from GetSession directly (do not map all errors to ErrSessionNotFound
unless sess == nil with nil error) and on GetMessages return the err instead of
substituting an empty slice (remove the a.logger-only swallow path). Update
callers to handle these storage errors/retries as needed; use the existing
symbols a.store.GetSession, a.store.GetMessages, ErrSessionNotFound, and
messages to locate and fix the logic.
| if err := result.session.writeControl(result.control); err != nil { | ||
| return | ||
| } | ||
| result.session.bootstrapTopics(r.Context(), lastEventID, result.topics) | ||
| _ = result.session.Serve(r.Context()) |
There was a problem hiding this comment.
Serialize snapshot bootstrap with the subscription change.
Both paths make the new subscription state observable before bootstrapTopics runs. A concurrent remove can land in that gap, and the snapshot path still enqueues to the session, so one stale event can leak after unsubscribe. Move bootstrap into the same session-scoped mutation critical section, or make snapshot enqueue refuse topics that are no longer attached.
Also applies to: 108-112
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/sse/multiplex_handler.go` around lines 66 - 70, The
bootstrapTopics call is currently executed after writeControl/Serve which makes
the new subscription state visible before the snapshot enqueue and allows a race
where a concurrent unsubscribe removes topics but the snapshot still enqueues
stale events; to fix, serialize snapshot bootstrap with the session-scoped
mutation by moving the call to session.bootstrapTopics into the same critical
section where result.session.writeControl (and any session subscription state
mutations) occur so the enqueue happens atomically with the subscription change,
or alternatively add a check in the snapshot-enqueue path to drop topics that
are no longer attached (i.e., make the session's snapshot enqueue/append reject
topics absent from the current subscription set in Session). Ensure you update
both code paths that call bootstrapTopics (the one using
result.session.writeControl/result.session.Serve and the other mirror path) and
reference the Session methods writeControl, bootstrapTopics, Serve and the
session snapshot enqueue/append logic when making the change.
| func parseLastEventID(r *http.Request) (uint64, error) { | ||
| raw := strings.TrimSpace(r.URL.Query().Get("lastEventId")) | ||
| if raw == "" { | ||
| raw = strings.TrimSpace(r.Header.Get("Last-Event-ID")) | ||
| } | ||
| if raw == "" { | ||
| return 0, nil | ||
| } | ||
|
|
||
| var lastEventID uint64 | ||
| if _, err := fmt.Sscanf(raw, "%d", &lastEventID); err != nil { | ||
| return 0, err | ||
| } | ||
| return lastEventID, nil |
There was a problem hiding this comment.
Forward Last-Event-ID to remote streams.
The local path accepts Last-Event-ID, but the proxied request only forwards query params. Any reconnect that relies on the header will be treated as a fresh remote stream.
Suggested fix
req.Header.Set("Accept", "text/event-stream")
+if v := strings.TrimSpace(r.Header.Get("Last-Event-ID")); v != "" {
+ req.Header.Set("Last-Event-ID", v)
+}
node.ApplyAuth(req)Also applies to: 139-145
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/sse/multiplex_handler.go` around lines 117 - 130,
The proxy only forwards query params so clients sending Last-Event-ID via header
are treated as new streams; update the multiplex handler code that constructs
the proxied HTTP requests to copy the incoming request's "Last-Event-ID" header
into the outbound request's headers (e.g., when you create the outgoing
*http.Request* for the remote stream). Ensure this header copy is done in both
places noted (the place that builds the proxied request around parseLastEventID
and the second occurrence referenced in the review), and keep using
parseLastEventID as-is to read either query or header on the local side.
| case TopicTypeDAGRuns, TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree: | ||
| identifier = strings.TrimPrefix(identifier, "?") | ||
| return sanitizeQueryForTopic(identifier), nil | ||
| case TopicTypeDAGRunLogs: | ||
| pathPart, queryPart, hasQuery := strings.Cut(identifier, "?") | ||
| pathPart = strings.TrimSpace(pathPart) | ||
| if pathPart == "" { | ||
| return "", fmt.Errorf("topic %q requires an identifier", topicType) | ||
| } | ||
| if !hasQuery { | ||
| return pathPart, nil | ||
| } | ||
| queryPart = sanitizeQueryForTopic(queryPart) | ||
| if queryPart == "" { | ||
| return pathPart, nil | ||
| } | ||
| return pathPart + "?" + queryPart, nil |
There was a problem hiding this comment.
Don't turn malformed queries into broad subscriptions.
These branches treat sanitizeQueryForTopic(...) == "" as success, but that helper also returns "" when query parsing fails. A topic like dagruns:%ZZ therefore canonicalizes to dagruns: and widens the subscription instead of failing. That breaks the new atomic-mutation contract and can expose a broader dataset than the caller asked for. Please surface parse errors here and add a regression test for malformed query strings.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/sse/topic_parse.go` around lines 52 - 68, The
current branches for
TopicTypeDAGRuns/TopicTypeQueues/TopicTypeDAGsList/TopicTypeDocTree and
TopicTypeDAGRunLogs treat sanitizeQueryForTopic(...) == "" as success but
sanitizeQueryForTopic also returns "" on parse failure, which widens
subscriptions; change sanitizeQueryForTopic to return (string, error) or add a
new parseAndSanitizeQuery(identifier string) (string, error) helper that
distinguishes an empty-but-valid query from a parse error, then update the
branches in topic_parse.go (the cases handling TopicTypeDAGRuns,
TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree and TopicTypeDAGRunLogs) to
call the new helper and return/propagate an error when parsing fails instead of
accepting "" as success; add a regression test that asserts malformed queries
like "%ZZ" cause the topic parsing to return an error rather than canonicalizing
to a broader subscription.
| // SetLegacyStreamDeprecationHeaders marks a legacy SSE endpoint as deprecated. | ||
| func SetLegacyStreamDeprecationHeaders(w http.ResponseWriter) { | ||
| w.Header().Set("Deprecation", "true") | ||
| w.Header().Set("Sunset", legacyStreamSunset) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "types.go" -path "*/service/frontend/sse/*" -type fRepository: dagu-org/dagu
Length of output: 1877
🌐 Web query:
RFC 9745 Deprecation header format specifications
💡 Result:
RFC 9745 defines the Deprecation HTTP response header field as:
- Structured Field type: Item (Structured Headers) whose value MUST be a Structured Field Date (RFC 9651 §3.3.7). In HTTP text form, that date is serialized as an
@followed by an integer number of seconds delta from 1970-01-01T00:00:00Z (Unix-epoch style), e.g.Deprecation:1688169599``. The date may be in the past (already deprecated) or future (will be deprecated then). [1][2] - Example (from RFC 9745):
[1]
Deprecation: `@1688169599`
Related note (often confused with Deprecation):
Sunsetheader (RFC 8594) uses a different date format: its value is an HTTP-date (IMF-fixdate / RFC 7231 style), e.g.Sunset: Sun, 30 Jun 2024 23:59:59 UTCin RFC 9745’s example, and RFC 9745 explicitly calls out this historical mismatch. [1][3]
Sources: RFC 9745 (Deprecation) [1], RFC 9651 (Structured Field Date) [2], RFC 8594 (Sunset / HTTP-date) [3].
🏁 Script executed:
# Get the actual file content to verify current implementation
if [ -f "internal/service/frontend/sse/types.go" ]; then
wc -l internal/service/frontend/sse/types.go
fiRepository: dagu-org/dagu
Length of output: 1877
Use RFC 9745–compliant format for the Deprecation header.
RFC 9745 specifies that Deprecation must be a Structured Field Date (format: @ followed by Unix timestamp). The current value "true" is not standards-compliant and clients may ignore or mispars it. Use a timestamp representing the deprecation instant instead.
Suggested fix
const (
+ legacyStreamDeprecatedAt = "@1773360000" // 2026-03-13T00:00:00Z
legacyStreamSunset = "Sat, 13 Mar 2027 00:00:00 GMT"
)
// SetLegacyStreamDeprecationHeaders marks a legacy SSE endpoint as deprecated.
func SetLegacyStreamDeprecationHeaders(w http.ResponseWriter) {
- w.Header().Set("Deprecation", "true")
+ w.Header().Set("Deprecation", legacyStreamDeprecatedAt)
w.Header().Set("Sunset", legacyStreamSunset)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/service/frontend/sse/types.go` around lines 69 - 72, The Deprecation
header in SetLegacyStreamDeprecationHeaders is not RFC 9745 compliant (currently
"true"); replace it with a Structured Field Date value of the form
"@<unix-seconds>" instead. In SetLegacyStreamDeprecationHeaders, compute the
deprecation instant as a Unix timestamp (e.g., the deprecation moment or
time.Now().Unix() / or derive from legacyStreamSunset if that represents the
instant), then set w.Header().Set("Deprecation", fmt.Sprintf("@%d", ts)) while
keeping the existing Sunset header write for legacyStreamSunset.
| const sseStatus = useSSEConnection( | ||
| sessionId, | ||
| openDelegateSessionIds, | ||
| apiURL, | ||
| remoteNode, | ||
| { | ||
| onSnapshot: (snapshot) => { | ||
| applySessionSnapshot(snapshot); | ||
| }, | ||
| onDelegateSnapshot: (delegateId, snapshot) => { | ||
| applyDelegateSnapshot(delegateId, snapshot); | ||
| }, | ||
| onNavigate: (path) => navigate(path), | ||
| onPreConnect: resetDelegates, | ||
| } | ||
| ); |
There was a problem hiding this comment.
onPreConnect wipes delegates that selectSession() just restored.
This hook always passes resetDelegates into useSSEConnection(). On a session switch, selectSession() already applies the fetched snapshot with restoreDelegates(), so the effect clears that state again on the next commit. Unless the stream immediately replays the full snapshot, the delegate list disappears until the next poll/update. Please move that reset to the paths that do not preload a snapshot (for example, explicit clear / brand-new session startup).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ui/src/features/agent/hooks/useAgentChat.ts` around lines 260 - 275, The
onPreConnect currently calls resetDelegates (passed into useSSEConnection) which
wipes delegates that selectSession()/restoreDelegates just restored; remove the
unconditional resetDelegates from the onPreConnect arg and instead call
resetDelegates only from explicit paths that clear or start a brand-new session
(e.g., the handler that clears session or the new-session startup flow), or wrap
onPreConnect with a conditional that checks a "delegatesPreloaded" /
"hasRestoredDelegates" flag (set by restoreDelegates/selectSession) and only
runs resetDelegates when that flag is false; update useSSEConnection invocation
to use the new conditional wrapper or remove the call and place resetDelegates
calls into the explicit clear/start handlers.
| let active = true; | ||
| const pollFallbackSnapshots = async () => { | ||
| if (shouldPollSession && sessionId) { | ||
| try { | ||
| const snapshot = await fetchSessionDetail(sessionId); | ||
| if (!active) { | ||
| return; | ||
| } | ||
| applySessionSnapshot(snapshot); | ||
| } catch { | ||
| // Best effort — the next poll or SSE recovery can still heal state. | ||
| } | ||
| } | ||
|
|
||
| for (const delegateId of offlineDelegateSessionIds) { | ||
| try { | ||
| const snapshot = await fetchSessionDetail(delegateId); | ||
| if (!active) { | ||
| return; | ||
| } | ||
| applyDelegateSnapshot(delegateId, snapshot); | ||
| } catch { | ||
| // Best effort — live SSE recovery or the next poll can still heal state. | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| const timer = setInterval(() => { | ||
| void pollFallbackSnapshots(); | ||
| }, FALLBACK_POLL_INTERVAL_MS); |
There was a problem hiding this comment.
Fallback polls can overlap and apply snapshots out of order.
setInterval() keeps firing even if the previous pollFallbackSnapshots() round is still awaiting network I/O. If one round takes longer than 2s, a slower response can overwrite newer session/delegate state while also doubling backend load. Add an in-flight guard or switch to recursive setTimeout() so only one poll round runs at a time.
🔁 Suggested guard
let active = true;
+ let pollInFlight = false;
+
const pollFallbackSnapshots = async () => {
+ if (pollInFlight) {
+ return;
+ }
+ pollInFlight = true;
+ try {
if (shouldPollSession && sessionId) {
try {
const snapshot = await fetchSessionDetail(sessionId);
@@
} catch {
// Best effort — live SSE recovery or the next poll can still heal state.
}
}
+ } finally {
+ pollInFlight = false;
+ }
};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ui/src/features/agent/hooks/useAgentChat.ts` around lines 291 - 320,
pollFallbackSnapshots can overlap because setInterval fires regardless of
in-flight work; add a single-run guard so only one poll runs at a time and
ensure timers are cancelled on cleanup. Implement either: (a) an inFlight
boolean (e.g., isPolling) checked at the top of pollFallbackSnapshots, set true
before doing fetchSessionDetail/fetchSessionDetail for delegateIds and set false
in a finally block so applySessionSnapshot/applyDelegateSnapshot are only called
from the active runner, or (b) replace setInterval(timer) with a recursive
setTimeout that schedules the next run only after the current run completes; in
both cases ensure you clear the timer and set active = false on unmount/cleanup
so no stray polls run.
| topicEntry.subscribers.set(subscriberId, callbacks); | ||
| callbacks.onStateChange(conn.state); | ||
| if (topicEntry.lastPayload !== null) { | ||
| callbacks.onData(topicEntry.lastPayload); | ||
| } | ||
|
|
||
| if (isNewTopic) { | ||
| this.handleTopicAdded(conn, topic); | ||
| } else { | ||
| this.ensureConnected(conn); |
There was a problem hiding this comment.
Fresh topics are marked live before the server subscribes them.
Line 217 replays the shared connection state to a brand-new topic before handleTopicAdded() has moved it from pendingAdd into serverTopics. For delegate panes, useSSEConnection treats that as “live” and disables fallback polling even though the topic is still pending or may later be rejected. Please emit topic-level state here (serverTopics.has(topic) / pending add) and only flip the topic to connected after the control or mutation ack arrives.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ui/src/hooks/SSEManager.ts` around lines 216 - 225, The code replays the
connection state to a new topic immediately (topicEntry.subscribers.set(...);
callbacks.onStateChange(conn.state)) before the topic is moved from pendingAdd
into serverTopics, causing delegate panes to treat it as live; change this so
you emit topic-level state based on whether serverTopics.has(topic) or the
pendingAdd flag (i.e., report a pending/not-connected state instead of
conn.state for fresh topics) and defer flipping the topic to connected until
handleTopicAdded confirms the server subscription (or until the control/mutation
ack arrives); update the logic around topicEntry, callbacks.onStateChange,
handleTopicAdded and ensureConnected so handleTopicAdded performs the final
callbacks.onStateChange(connected) when the topic is actually moved into
serverTopics.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1768 +/- ##
==========================================
- Coverage 67.99% 66.81% -1.19%
==========================================
Files 402 402
Lines 45547 45808 +261
==========================================
- Hits 30969 30605 -364
- Misses 11379 11556 +177
- Partials 3199 3647 +448
... and 12 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Notes
Summary by CodeRabbit
Release Notes
New Features
Deprecations