Skip to content

Stabilize multiplexed SSE connection management#1768

Merged
yohamta0 merged 5 commits into
mainfrom
sse-hardening
Mar 14, 2026
Merged

Stabilize multiplexed SSE connection management#1768
yohamta0 merged 5 commits into
mainfrom
sse-hardening

Conversation

@yohamta0

@yohamta0 yohamta0 commented Mar 13, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • harden multiplexed SSE session mutation so topic updates are atomic and conflicting mutations fail cleanly
  • add agent-session fallback polling plus focused hook coverage and broaden legacy endpoint-to-topic mapping tests
  • refine internal SSE boundaries and remote agent topic proxying to keep the implementation lint-clean and easier to reason about

Testing

  • make fmt
  • go test ./internal/service/frontend/... ./internal/agent ./internal/cmn/config
  • pnpm -s exec vitest --run --passWithNoTests

Notes

  • ui typecheck still has unrelated pre-existing docs-page errors outside this branch's scope

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced multiplexed Server-Sent Events (SSE) streaming with support for per-topic subscriptions and authorization, enabling more efficient real-time updates.
    • Added agent session streaming as a new topic type for better session monitoring.
    • Implemented session authorization validation for improved security.
  • Deprecations

    • Marked legacy SSE streaming endpoint as deprecated; users should migrate to the new multiplexed streaming approach.

@coderabbitai

coderabbitai Bot commented Mar 13, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2b6af7ed-ea13-4689-9b6b-acba9eee8c1f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces an SSE multiplexing system with topic-based subscriptions for managing multiple concurrent SSE streams per session. Backend changes add SSE configuration, a multiplexer component for session/topic lifecycle management, and new agent API methods for authorization and snapshots. Frontend changes refactor SSE subscription logic to use topics and add fallback polling when SSE is offline, plus delegate session polling support.

Changes

Cohort / File(s) Summary
Agent API Extensions
internal/agent/api.go
Added AuthorizeSessionAccess to verify session access and GetSessionSnapshot to return full session state with delegates, supporting both in-memory and persistent store retrieval.
SSE Configuration System
internal/cmn/config/config.go, internal/cmn/config/definition.go, internal/cmn/config/loader.go, internal/cmn/config/loader_test.go
Introduced SSEConfig type with fields for multiplexed streaming (MaxTopicsPerConnection, MaxClients, HeartbeatInterval, WriteBufferSize, SlowClientTimeout), extended Server config, added validation, and integrated environment variable binding for SSE settings.
SSE Metrics
internal/service/frontend/sse/metrics.go
Added new metrics fields (multiplexSessionsActive, topicsPerSession, topicMutations, backpressureDisconnects, unknownSessionMutations) with public accessor methods for tracking multiplexed session and topic behavior.
SSE Multiplexing Core
internal/service/frontend/sse/multiplex.go, internal/service/frontend/sse/topic_parse.go
Implemented comprehensive Multiplexer with per-session and per-topic management, topic authorization/fetchers registration, mutation conflict detection, backpressure handling, heartbeat streaming, and topic parsing with canonicalization (ParsedTopic, ParseTopic function).
SSE Handler & Routing
internal/service/frontend/sse/handler.go, internal/service/frontend/sse/types.go, internal/service/frontend/sse/proxy.go, internal/service/frontend/sse/proxy_test.go
Added SetLegacyStreamDeprecationHeaders to mark legacy endpoints, introduced TopicTypeAgent constant, extended path resolver for agent topics, and updated handler to set deprecation headers. Includes test coverage for agent topic URL building.
SSE Multiplexed Handler
internal/service/frontend/sse/multiplex_handler.go
Introduced MultiplexHandler coordinating in-process multiplexer and remote-node proxy, with HandleStream and HandleTopicMutation methods supporting local session creation, topic mutations, and remote delegation with structured error responses.
Server SSE Wiring
internal/service/frontend/server.go
Refactored registerSSEFetchers to use an interface-based registrar pattern, added Multiplexer instantiation and registration, implemented registerMultiplexSSETopics for agent-related topics, and added lifecycle management for multiplexer shutdown.
Frontend SSE Manager
ui/src/hooks/SSEManager.ts, ui/src/hooks/__tests__/SSEManager.test.ts
Replaced endpoint-centric design with topic-based subscriptions, added per-topic payload caching and state tracking, implemented topic mutation scheduling, integrated authentication-aware URL construction (getAuthToken/getAuthHeaders), and added topic routing helpers (endpointToTopic, buildAgentSessionTopic). Tests validate topic mapping and agent session topic building.
Frontend SSE Hook
ui/src/hooks/useSSE.ts
Enhanced error handling with try-catch around SSE subscription, explicit unsubscribe management, and improved cleanup on effect unmount.
Frontend Agent Chat Hooks
ui/src/features/agent/hooks/useAgentChat.ts, ui/src/features/agent/hooks/useDelegateManager.ts, ui/src/features/agent/hooks/useSSEConnection.ts
Refactored to support SSE snapshot-driven updates, added polling fallback (FALLBACK_POLL_INTERVAL_MS) for offline sessions/delegates, integrated multi-delegate management (reopenDelegate, removeDelegate), replaced event-specific callbacks with onSnapshot/onDelegateSnapshot, and added isSessionLive tracking. Returns AgentSSEStatus with connection liveness.
Frontend Agent Chat Tests
ui/src/features/agent/hooks/__tests__/useAgentChat.test.tsx, ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx
Added test suites covering fallback polling when SSE is offline and reconnects, delegate pane polling, SSE liveness tracking, and UI action deduplication.
SSE Multiplexing Tests
internal/service/frontend/sse/multiplex_test.go
Comprehensive tests for topic parsing, session creation with authorization filtering, topic mutation with partial authorization and conflict detection, topic registry sharing, and remote event URL construction with parameter stripping.
Removed Tests
ui/src/lib/__tests__/parseParams.test.ts
Deleted test suite for parameter parsing and stringification utilities.

Sequence Diagrams

sequenceDiagram
    participant Client as Client App
    participant SSEMgr as SSEManager
    participant Conn as SSE Connection
    participant Server as Multiplexer
    participant Topic as Topic Handler

    Client->>SSEMgr: subscribeTopic(sessionId)
    SSEMgr->>SSEMgr: getOrCreateConnection(apiURL)
    SSEMgr->>Conn: create/reuse
    SSEMgr->>Server: POST /events/stream<br/>(topics=[agent:sessionId])
    Server->>Server: createSession()<br/>register subscriber
    Server->>Conn: send control event<br/>(sessionId, initial topics)
    Conn->>SSEMgr: onData(controlEvent)
    SSEMgr->>SSEMgr: updateState(connected)
    SSEMgr->>Client: onSnapshot(sessionState)
    
    rect rgba(100, 150, 200, 0.5)
        Note over Client,Topic: Streaming Phase
        Server->>Topic: poll payload
        Topic->>Server: return data
        Server->>Conn: send data frame
        Conn->>SSEMgr: onData(dataEvent)
        SSEMgr->>Client: onSnapshot(updated)
    end
Loading
sequenceDiagram
    participant Client as Client App
    participant SSEMgr as SSEManager
    participant Fallback as Polling Fallback
    participant API as Agent API
    
    Client->>SSEMgr: subscribeTopic()
    SSEMgr->>SSEMgr: connection failed
    SSEMgr->>Client: onStateChange(fallback:true)
    
    rect rgba(200, 100, 100, 0.5)
        Note over Client,API: Fallback Polling
        Fallback->>Fallback: FALLBACK_POLL_INTERVAL_MS
        Fallback->>API: GetSessionSnapshot()
        API->>Fallback: StreamResponse
        Fallback->>Client: onSnapshot(snapshot)
    end
    
    rect rgba(100, 200, 100, 0.5)
        Note over Client,SSEMgr: SSE Reconnect
        SSEMgr->>SSEMgr: connection restored
        SSEMgr->>Client: onStateChange(live:true)
        Fallback->>Fallback: halt polling
    end
Loading
sequenceDiagram
    participant Client as Client
    participant Mux as Multiplexer
    participant TopicReg as Topic Registry
    participant Auth as Authorizer
    
    Client->>Mux: mutateSession(sessionId,<br/>add=[topic1, topic2])
    Mux->>Mux: parseTopicList()
    Mux->>Mux: resolveTopicsForMutation()
    
    loop for each topic
        Mux->>Auth: authorize(topic)
        alt authorized
            Mux->>TopicReg: getOrCreateTopic()
        else unauthorized
            Mux->>Mux: record error
        end
    end
    
    alt conflict detected
        Mux->>Client: error(ErrConflictingMutation)
    else success
        Mux->>Mux: applyMutation(session)
        Mux->>Client: mutationResult(added,removed,errors)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • #1689: Directly extends agent session APIs (Delegates, StreamResponse) and snapshot handling that this PR builds upon for multiplexing.
  • #1714: Overlaps on SSE subsystem changes including server routing, handler refactoring, and proxy logic modifications.
  • #1719: Covers client-side SSE synchronization and useSSEConnection flow changes that interact with the new topic-based subscription model.
🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.28% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Stabilize multiplexed SSE connection management' directly and clearly describes the primary objective of the changeset, which involves hardening and stabilizing multiplexed SSE session mutation, fallback polling, and internal boundaries across the codebase.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch sse-hardening
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🧹 Nitpick comments (1)
internal/cmn/config/config.go (1)

462-476: Validation allows zero values for all SSE fields.

The validation only checks for negative values (< 0). Zero values are accepted, which means:

  • MaxTopicsPerConnection = 0 → no topics per connection
  • MaxClients = 0 → no clients allowed
  • HeartbeatInterval = 0 → no heartbeats

This may be intentional (defaults are set in loader.go), but consider whether zero should be invalid for fields like HeartbeatInterval where disabling heartbeats could cause connection issues.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/cmn/config/config.go` around lines 462 - 476, Validation currently
allows zero for all SSE fields; update the checks so HeartbeatInterval cannot be
zero by changing the guard on c.Server.SSE.HeartbeatInterval to reject <= 0 and
return a clear error (e.g., "sse.heartbeat_interval must be > 0"); decide
whether to treat c.Server.SSE.MaxTopicsPerConnection and c.Server.SSE.MaxClients
as > 0 as well (if so, change their checks from < 0 to <= 0 and adjust error
messages). Ensure you reference the defaulting behavior in loader.go when making
this change so you don't break intended defaults.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/agent/api.go`:
- Around line 855-869: The current StreamResponse assembly reads multiple
SessionManager accessors (GetMessages, GetSession, IsWorking, HasPendingPrompt,
GetModel, GetTotalCost, GetDelegates) separately which can observe torn state;
add a single locked snapshot helper on SessionManager (e.g., Snapshot() or
GetLockedState()) that acquires the manager's mutex and returns a struct
containing Messages, Session, Working, HasPendingPrompt, Model, TotalCost,
Delegates atomically, then replace the field-by-field reads in the handler
(where sessions.Load and the StreamResponse/SessionState are built) with a
single call to that snapshot and map its fields into
StreamResponse/SessionState.
- Around line 847-849: AuthorizeSessionAccess currently calls
getUserIDFromContext (which may fallback to "admin") and then GetSessionDetail
(which loads full messages/delegates); change it to explicitly resolve the
principal from ctx without the admin fallback (or only allow the fallback when
auth is globally disabled) and perform a lightweight metadata-only session
existence/ownership check instead of calling GetSessionDetail—use the session
metadata accessor (or add one) to verify session existence and owner matches the
resolved user ID before returning.
- Around line 877-886: The code masks storage errors by turning any
a.store.GetSession error into ErrSessionNotFound and by swallowing
a.store.GetMessages errors (setting messages = []Message{}); change both to
propagate the original store error to the caller instead: return the error from
GetSession directly (do not map all errors to ErrSessionNotFound unless sess ==
nil with nil error) and on GetMessages return the err instead of substituting an
empty slice (remove the a.logger-only swallow path). Update callers to handle
these storage errors/retries as needed; use the existing symbols
a.store.GetSession, a.store.GetMessages, ErrSessionNotFound, and messages to
locate and fix the logic.

In `@internal/service/frontend/sse/multiplex_handler.go`:
- Around line 66-70: The bootstrapTopics call is currently executed after
writeControl/Serve which makes the new subscription state visible before the
snapshot enqueue and allows a race where a concurrent unsubscribe removes topics
but the snapshot still enqueues stale events; to fix, serialize snapshot
bootstrap with the session-scoped mutation by moving the call to
session.bootstrapTopics into the same critical section where
result.session.writeControl (and any session subscription state mutations) occur
so the enqueue happens atomically with the subscription change, or alternatively
add a check in the snapshot-enqueue path to drop topics that are no longer
attached (i.e., make the session's snapshot enqueue/append reject topics absent
from the current subscription set in Session). Ensure you update both code paths
that call bootstrapTopics (the one using
result.session.writeControl/result.session.Serve and the other mirror path) and
reference the Session methods writeControl, bootstrapTopics, Serve and the
session snapshot enqueue/append logic when making the change.
- Around line 117-130: The proxy only forwards query params so clients sending
Last-Event-ID via header are treated as new streams; update the multiplex
handler code that constructs the proxied HTTP requests to copy the incoming
request's "Last-Event-ID" header into the outbound request's headers (e.g., when
you create the outgoing *http.Request* for the remote stream). Ensure this
header copy is done in both places noted (the place that builds the proxied
request around parseLastEventID and the second occurrence referenced in the
review), and keep using parseLastEventID as-is to read either query or header on
the local side.

In `@internal/service/frontend/sse/multiplex.go`:
- Around line 411-427: getOrCreateTopicForMutation can return a topic that's
being retired by unsubscribeTopic, causing stop() to close stopCh on a
still-in-use object; fix by making topic lifecycle changes atomic: add a
lifecycle flag on multiplexTopic (e.g., stopped/shuttingDown) and update
getOrCreateTopicForMutation to check that flag under m.mu and treat a stopped
topic as non-existent (create a new one instead); change unsubscribeTopic to
mark the topic as shuttingDown and remove it from m.topics while holding m.mu,
then release the lock and call topic.stop() (closing stopCh) once — this ensures
lookups never get a topic that is mid-retire and prevents closing stopCh on a
live topic (refer to getOrCreateTopicForMutation, unsubscribeTopic,
multiplexTopic.sessionCount, multiplexTopic.stop, stopCh, and m.topics).
- Around line 256-367: The mutation logic in applyMutation must be serialized
with session teardown to avoid races; add a session-scoped mutex (e.g.,
mutationMu on streamSession) and acquire it at the start of applyMutation and in
removeSession (and the other mutation path noted at 457-476) to serialize checks
and membership updates (currentSet/finalCount, calls to
addTopic/removeTopic/removeSession). Hold the lock while re-checking
session.isClosed and while performing unsubscribeTopic/addTopic/topic.addSession
and any adjustments to topic membership, using defer to unlock; keep
resolveTopicsForMutation outside the lock but ensure createdTopics cleanup and
the final membership changes happen under the session lock so no concurrent
mutation or teardown can violate maxTopicsPerConn or leave topics attached to
closed sessions.

In `@internal/service/frontend/sse/proxy.go`:
- Around line 117-118: The TopicTypeAgent branch currently concatenates the raw
identifier into the path (case TopicTypeAgent -> return "/agent/sessions/" +
identifier + "/stream"), which allows reserved characters to alter path/query;
change this to treat the identifier as a single path segment by applying
url.PathEscape(identifier) when building the string (and add the net/url
import), so identifiers like "a/b" or "sess-1?x=y" are percent-escaped rather
than injected into the path or query.

In `@internal/service/frontend/sse/topic_parse.go`:
- Around line 52-68: The current branches for
TopicTypeDAGRuns/TopicTypeQueues/TopicTypeDAGsList/TopicTypeDocTree and
TopicTypeDAGRunLogs treat sanitizeQueryForTopic(...) == "" as success but
sanitizeQueryForTopic also returns "" on parse failure, which widens
subscriptions; change sanitizeQueryForTopic to return (string, error) or add a
new parseAndSanitizeQuery(identifier string) (string, error) helper that
distinguishes an empty-but-valid query from a parse error, then update the
branches in topic_parse.go (the cases handling TopicTypeDAGRuns,
TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree and TopicTypeDAGRunLogs) to
call the new helper and return/propagate an error when parsing fails instead of
accepting "" as success; add a regression test that asserts malformed queries
like "%ZZ" cause the topic parsing to return an error rather than canonicalizing
to a broader subscription.

In `@internal/service/frontend/sse/types.go`:
- Around line 69-72: The Deprecation header in SetLegacyStreamDeprecationHeaders
is not RFC 9745 compliant (currently "true"); replace it with a Structured Field
Date value of the form "@<unix-seconds>" instead. In
SetLegacyStreamDeprecationHeaders, compute the deprecation instant as a Unix
timestamp (e.g., the deprecation moment or time.Now().Unix() / or derive from
legacyStreamSunset if that represents the instant), then set
w.Header().Set("Deprecation", fmt.Sprintf("@%d", ts)) while keeping the existing
Sunset header write for legacyStreamSunset.

In `@ui/src/features/agent/hooks/useAgentChat.ts`:
- Around line 260-275: The onPreConnect currently calls resetDelegates (passed
into useSSEConnection) which wipes delegates that
selectSession()/restoreDelegates just restored; remove the unconditional
resetDelegates from the onPreConnect arg and instead call resetDelegates only
from explicit paths that clear or start a brand-new session (e.g., the handler
that clears session or the new-session startup flow), or wrap onPreConnect with
a conditional that checks a "delegatesPreloaded" / "hasRestoredDelegates" flag
(set by restoreDelegates/selectSession) and only runs resetDelegates when that
flag is false; update useSSEConnection invocation to use the new conditional
wrapper or remove the call and place resetDelegates calls into the explicit
clear/start handlers.
- Around line 291-320: pollFallbackSnapshots can overlap because setInterval
fires regardless of in-flight work; add a single-run guard so only one poll runs
at a time and ensure timers are cancelled on cleanup. Implement either: (a) an
inFlight boolean (e.g., isPolling) checked at the top of pollFallbackSnapshots,
set true before doing fetchSessionDetail/fetchSessionDetail for delegateIds and
set false in a finally block so applySessionSnapshot/applyDelegateSnapshot are
only called from the active runner, or (b) replace setInterval(timer) with a
recursive setTimeout that schedules the next run only after the current run
completes; in both cases ensure you clear the timer and set active = false on
unmount/cleanup so no stray polls run.

In `@ui/src/hooks/SSEManager.ts`:
- Around line 216-225: The code replays the connection state to a new topic
immediately (topicEntry.subscribers.set(...);
callbacks.onStateChange(conn.state)) before the topic is moved from pendingAdd
into serverTopics, causing delegate panes to treat it as live; change this so
you emit topic-level state based on whether serverTopics.has(topic) or the
pendingAdd flag (i.e., report a pending/not-connected state instead of
conn.state for fresh topics) and defer flipping the topic to connected until
handleTopicAdded confirms the server subscription (or until the control/mutation
ack arrives); update the logic around topicEntry, callbacks.onStateChange,
handleTopicAdded and ensureConnected so handleTopicAdded performs the final
callbacks.onStateChange(connected) when the topic is actually moved into
serverTopics.

---

Nitpick comments:
In `@internal/cmn/config/config.go`:
- Around line 462-476: Validation currently allows zero for all SSE fields;
update the checks so HeartbeatInterval cannot be zero by changing the guard on
c.Server.SSE.HeartbeatInterval to reject <= 0 and return a clear error (e.g.,
"sse.heartbeat_interval must be > 0"); decide whether to treat
c.Server.SSE.MaxTopicsPerConnection and c.Server.SSE.MaxClients as > 0 as well
(if so, change their checks from < 0 to <= 0 and adjust error messages). Ensure
you reference the defaulting behavior in loader.go when making this change so
you don't break intended defaults.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26ba3f8e-8057-40c7-bb7d-032073d9afac

📥 Commits

Reviewing files that changed from the base of the PR and between f4b7282 and 16796d6.

📒 Files selected for processing (24)
  • internal/agent/api.go
  • internal/cmn/config/config.go
  • internal/cmn/config/definition.go
  • internal/cmn/config/loader.go
  • internal/cmn/config/loader_test.go
  • internal/service/frontend/server.go
  • internal/service/frontend/sse/handler.go
  • internal/service/frontend/sse/metrics.go
  • internal/service/frontend/sse/multiplex.go
  • internal/service/frontend/sse/multiplex_handler.go
  • internal/service/frontend/sse/multiplex_test.go
  • internal/service/frontend/sse/proxy.go
  • internal/service/frontend/sse/proxy_test.go
  • internal/service/frontend/sse/topic_parse.go
  • internal/service/frontend/sse/types.go
  • ui/src/features/agent/hooks/__tests__/useAgentChat.test.tsx
  • ui/src/features/agent/hooks/__tests__/useSSEConnection.test.tsx
  • ui/src/features/agent/hooks/useAgentChat.ts
  • ui/src/features/agent/hooks/useDelegateManager.ts
  • ui/src/features/agent/hooks/useSSEConnection.ts
  • ui/src/hooks/SSEManager.ts
  • ui/src/hooks/__tests__/SSEManager.test.ts
  • ui/src/hooks/useSSE.ts
  • ui/src/lib/__tests__/parseParams.test.ts
💤 Files with no reviewable changes (1)
  • ui/src/lib/tests/parseParams.test.ts

Comment thread internal/agent/api.go Outdated
Comment on lines +847 to +849
func (a *API) AuthorizeSessionAccess(ctx context.Context, sessionID string) error {
_, err := a.GetSessionDetail(ctx, sessionID, getUserIDFromContext(ctx))
return err

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail closed in AuthorizeSessionAccess.

Because this method is now an exported authorization boundary, inheriting the unconditional "admin" fallback from getUserIDFromContext is risky: a caller that passes a bare context.Context is authorized as admin instead of being rejected. It also goes through GetSessionDetail, which loads messages/delegates even though the check only needs ownership/existence. Please take the resolved principal explicitly here (or gate the fallback on auth-disabled mode) and query only session metadata.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/agent/api.go` around lines 847 - 849, AuthorizeSessionAccess
currently calls getUserIDFromContext (which may fallback to "admin") and then
GetSessionDetail (which loads full messages/delegates); change it to explicitly
resolve the principal from ctx without the admin fallback (or only allow the
fallback when auth is globally disabled) and perform a lightweight metadata-only
session existence/ownership check instead of calling GetSessionDetail—use the
session metadata accessor (or add one) to verify session existence and owner
matches the resolved user ID before returning.

Comment thread internal/agent/api.go Outdated
Comment on lines +855 to +869
if mgrValue, ok := a.sessions.Load(sessionID); ok {
if mgr, ok := mgrValue.(*SessionManager); ok {
sess := mgr.GetSession()
return &StreamResponse{
Messages: mgr.GetMessages(),
Session: &sess,
SessionState: &SessionState{
SessionID: sessionID,
Working: mgr.IsWorking(),
HasPendingPrompt: mgr.HasPendingPrompt(),
Model: mgr.GetModel(),
TotalCost: mgr.GetTotalCost(),
},
Delegates: mgr.GetDelegates(),
}, nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Build the active-session snapshot atomically.

This response is assembled from several independent SessionManager reads (GetMessages, GetSession, IsWorking, HasPendingPrompt, GetModel, GetTotalCost, GetDelegates). Concurrent updates can interleave between those calls, so polling/proxy consumers may observe a torn snapshot. Please funnel this through a single locked snapshot helper instead of composing it field-by-field.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/agent/api.go` around lines 855 - 869, The current StreamResponse
assembly reads multiple SessionManager accessors (GetMessages, GetSession,
IsWorking, HasPendingPrompt, GetModel, GetTotalCost, GetDelegates) separately
which can observe torn state; add a single locked snapshot helper on
SessionManager (e.g., Snapshot() or GetLockedState()) that acquires the
manager's mutex and returns a struct containing Messages, Session, Working,
HasPendingPrompt, Model, TotalCost, Delegates atomically, then replace the
field-by-field reads in the handler (where sessions.Load and the
StreamResponse/SessionState are built) with a single call to that snapshot and
map its fields into StreamResponse/SessionState.

Comment thread internal/agent/api.go Outdated
Comment on lines +877 to +886
sess, err := a.store.GetSession(ctx, sessionID)
if err != nil || sess == nil {
return nil, ErrSessionNotFound
}

messages, err := a.store.GetMessages(ctx, sessionID)
if err != nil {
a.logger.Error("Failed to get messages from store", "error", err)
messages = []Message{}
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate store failures instead of faking an empty or missing session.

On the store path, GetSession errors are collapsed into ErrSessionNotFound, and GetMessages errors still return a successful snapshot with []Message{}. For snapshot/polling consumers, that makes transient backend failures look like “session disappeared” or “history is empty”, which can clobber good client state. Return the storage error so the caller can retry.

Possible fix
 	sess, err := a.store.GetSession(ctx, sessionID)
-	if err != nil || sess == nil {
-		return nil, ErrSessionNotFound
-	}
+	if err != nil {
+		return nil, err
+	}
+	if sess == nil {
+		return nil, ErrSessionNotFound
+	}
 
 	messages, err := a.store.GetMessages(ctx, sessionID)
 	if err != nil {
-		a.logger.Error("Failed to get messages from store", "error", err)
-		messages = []Message{}
+		return nil, err
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/agent/api.go` around lines 877 - 886, The code masks storage errors
by turning any a.store.GetSession error into ErrSessionNotFound and by
swallowing a.store.GetMessages errors (setting messages = []Message{}); change
both to propagate the original store error to the caller instead: return the
error from GetSession directly (do not map all errors to ErrSessionNotFound
unless sess == nil with nil error) and on GetMessages return the err instead of
substituting an empty slice (remove the a.logger-only swallow path). Update
callers to handle these storage errors/retries as needed; use the existing
symbols a.store.GetSession, a.store.GetMessages, ErrSessionNotFound, and
messages to locate and fix the logic.

Comment on lines +66 to +70
if err := result.session.writeControl(result.control); err != nil {
return
}
result.session.bootstrapTopics(r.Context(), lastEventID, result.topics)
_ = result.session.Serve(r.Context())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Serialize snapshot bootstrap with the subscription change.

Both paths make the new subscription state observable before bootstrapTopics runs. A concurrent remove can land in that gap, and the snapshot path still enqueues to the session, so one stale event can leak after unsubscribe. Move bootstrap into the same session-scoped mutation critical section, or make snapshot enqueue refuse topics that are no longer attached.

Also applies to: 108-112

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/multiplex_handler.go` around lines 66 - 70, The
bootstrapTopics call is currently executed after writeControl/Serve which makes
the new subscription state visible before the snapshot enqueue and allows a race
where a concurrent unsubscribe removes topics but the snapshot still enqueues
stale events; to fix, serialize snapshot bootstrap with the session-scoped
mutation by moving the call to session.bootstrapTopics into the same critical
section where result.session.writeControl (and any session subscription state
mutations) occur so the enqueue happens atomically with the subscription change,
or alternatively add a check in the snapshot-enqueue path to drop topics that
are no longer attached (i.e., make the session's snapshot enqueue/append reject
topics absent from the current subscription set in Session). Ensure you update
both code paths that call bootstrapTopics (the one using
result.session.writeControl/result.session.Serve and the other mirror path) and
reference the Session methods writeControl, bootstrapTopics, Serve and the
session snapshot enqueue/append logic when making the change.

Comment on lines +117 to +130
func parseLastEventID(r *http.Request) (uint64, error) {
raw := strings.TrimSpace(r.URL.Query().Get("lastEventId"))
if raw == "" {
raw = strings.TrimSpace(r.Header.Get("Last-Event-ID"))
}
if raw == "" {
return 0, nil
}

var lastEventID uint64
if _, err := fmt.Sscanf(raw, "%d", &lastEventID); err != nil {
return 0, err
}
return lastEventID, nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Forward Last-Event-ID to remote streams.

The local path accepts Last-Event-ID, but the proxied request only forwards query params. Any reconnect that relies on the header will be treated as a fresh remote stream.

Suggested fix
 req.Header.Set("Accept", "text/event-stream")
+if v := strings.TrimSpace(r.Header.Get("Last-Event-ID")); v != "" {
+	req.Header.Set("Last-Event-ID", v)
+}
 node.ApplyAuth(req)

Also applies to: 139-145

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/multiplex_handler.go` around lines 117 - 130,
The proxy only forwards query params so clients sending Last-Event-ID via header
are treated as new streams; update the multiplex handler code that constructs
the proxied HTTP requests to copy the incoming request's "Last-Event-ID" header
into the outbound request's headers (e.g., when you create the outgoing
*http.Request* for the remote stream). Ensure this header copy is done in both
places noted (the place that builds the proxied request around parseLastEventID
and the second occurrence referenced in the review), and keep using
parseLastEventID as-is to read either query or header on the local side.

Comment on lines +52 to +68
case TopicTypeDAGRuns, TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree:
identifier = strings.TrimPrefix(identifier, "?")
return sanitizeQueryForTopic(identifier), nil
case TopicTypeDAGRunLogs:
pathPart, queryPart, hasQuery := strings.Cut(identifier, "?")
pathPart = strings.TrimSpace(pathPart)
if pathPart == "" {
return "", fmt.Errorf("topic %q requires an identifier", topicType)
}
if !hasQuery {
return pathPart, nil
}
queryPart = sanitizeQueryForTopic(queryPart)
if queryPart == "" {
return pathPart, nil
}
return pathPart + "?" + queryPart, nil

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't turn malformed queries into broad subscriptions.

These branches treat sanitizeQueryForTopic(...) == "" as success, but that helper also returns "" when query parsing fails. A topic like dagruns:%ZZ therefore canonicalizes to dagruns: and widens the subscription instead of failing. That breaks the new atomic-mutation contract and can expose a broader dataset than the caller asked for. Please surface parse errors here and add a regression test for malformed query strings.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/topic_parse.go` around lines 52 - 68, The
current branches for
TopicTypeDAGRuns/TopicTypeQueues/TopicTypeDAGsList/TopicTypeDocTree and
TopicTypeDAGRunLogs treat sanitizeQueryForTopic(...) == "" as success but
sanitizeQueryForTopic also returns "" on parse failure, which widens
subscriptions; change sanitizeQueryForTopic to return (string, error) or add a
new parseAndSanitizeQuery(identifier string) (string, error) helper that
distinguishes an empty-but-valid query from a parse error, then update the
branches in topic_parse.go (the cases handling TopicTypeDAGRuns,
TopicTypeQueues, TopicTypeDAGsList, TopicTypeDocTree and TopicTypeDAGRunLogs) to
call the new helper and return/propagate an error when parsing fails instead of
accepting "" as success; add a regression test that asserts malformed queries
like "%ZZ" cause the topic parsing to return an error rather than canonicalizing
to a broader subscription.

Comment on lines +69 to +72
// SetLegacyStreamDeprecationHeaders marks a legacy SSE endpoint as deprecated.
func SetLegacyStreamDeprecationHeaders(w http.ResponseWriter) {
w.Header().Set("Deprecation", "true")
w.Header().Set("Sunset", legacyStreamSunset)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "types.go" -path "*/service/frontend/sse/*" -type f

Repository: dagu-org/dagu

Length of output: 1877


🌐 Web query:

RFC 9745 Deprecation header format specifications

💡 Result:

RFC 9745 defines the Deprecation HTTP response header field as:

  • Structured Field type: Item (Structured Headers) whose value MUST be a Structured Field Date (RFC 9651 §3.3.7). In HTTP text form, that date is serialized as an @ followed by an integer number of seconds delta from 1970-01-01T00:00:00Z (Unix-epoch style), e.g. Deprecation: 1688169599``. The date may be in the past (already deprecated) or future (will be deprecated then). [1][2]
  • Example (from RFC 9745):
    Deprecation: `@1688169599`
    [1]

Related note (often confused with Deprecation):

  • Sunset header (RFC 8594) uses a different date format: its value is an HTTP-date (IMF-fixdate / RFC 7231 style), e.g. Sunset: Sun, 30 Jun 2024 23:59:59 UTC in RFC 9745’s example, and RFC 9745 explicitly calls out this historical mismatch. [1][3]

Sources: RFC 9745 (Deprecation) [1], RFC 9651 (Structured Field Date) [2], RFC 8594 (Sunset / HTTP-date) [3].


🏁 Script executed:

# Get the actual file content to verify current implementation
if [ -f "internal/service/frontend/sse/types.go" ]; then
  wc -l internal/service/frontend/sse/types.go
fi

Repository: dagu-org/dagu

Length of output: 1877


Use RFC 9745–compliant format for the Deprecation header.

RFC 9745 specifies that Deprecation must be a Structured Field Date (format: @ followed by Unix timestamp). The current value "true" is not standards-compliant and clients may ignore or mispars it. Use a timestamp representing the deprecation instant instead.

Suggested fix
 const (
+	legacyStreamDeprecatedAt = "@1773360000" // 2026-03-13T00:00:00Z
 	legacyStreamSunset = "Sat, 13 Mar 2027 00:00:00 GMT"
 )
 
 // SetLegacyStreamDeprecationHeaders marks a legacy SSE endpoint as deprecated.
 func SetLegacyStreamDeprecationHeaders(w http.ResponseWriter) {
-	w.Header().Set("Deprecation", "true")
+	w.Header().Set("Deprecation", legacyStreamDeprecatedAt)
 	w.Header().Set("Sunset", legacyStreamSunset)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/service/frontend/sse/types.go` around lines 69 - 72, The Deprecation
header in SetLegacyStreamDeprecationHeaders is not RFC 9745 compliant (currently
"true"); replace it with a Structured Field Date value of the form
"@<unix-seconds>" instead. In SetLegacyStreamDeprecationHeaders, compute the
deprecation instant as a Unix timestamp (e.g., the deprecation moment or
time.Now().Unix() / or derive from legacyStreamSunset if that represents the
instant), then set w.Header().Set("Deprecation", fmt.Sprintf("@%d", ts)) while
keeping the existing Sunset header write for legacyStreamSunset.

Comment on lines +260 to +275
const sseStatus = useSSEConnection(
sessionId,
openDelegateSessionIds,
apiURL,
remoteNode,
{
onSnapshot: (snapshot) => {
applySessionSnapshot(snapshot);
},
onDelegateSnapshot: (delegateId, snapshot) => {
applyDelegateSnapshot(delegateId, snapshot);
},
onNavigate: (path) => navigate(path),
onPreConnect: resetDelegates,
}
);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

onPreConnect wipes delegates that selectSession() just restored.

This hook always passes resetDelegates into useSSEConnection(). On a session switch, selectSession() already applies the fetched snapshot with restoreDelegates(), so the effect clears that state again on the next commit. Unless the stream immediately replays the full snapshot, the delegate list disappears until the next poll/update. Please move that reset to the paths that do not preload a snapshot (for example, explicit clear / brand-new session startup).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/hooks/useAgentChat.ts` around lines 260 - 275, The
onPreConnect currently calls resetDelegates (passed into useSSEConnection) which
wipes delegates that selectSession()/restoreDelegates just restored; remove the
unconditional resetDelegates from the onPreConnect arg and instead call
resetDelegates only from explicit paths that clear or start a brand-new session
(e.g., the handler that clears session or the new-session startup flow), or wrap
onPreConnect with a conditional that checks a "delegatesPreloaded" /
"hasRestoredDelegates" flag (set by restoreDelegates/selectSession) and only
runs resetDelegates when that flag is false; update useSSEConnection invocation
to use the new conditional wrapper or remove the call and place resetDelegates
calls into the explicit clear/start handlers.

Comment on lines +291 to +320
let active = true;
const pollFallbackSnapshots = async () => {
if (shouldPollSession && sessionId) {
try {
const snapshot = await fetchSessionDetail(sessionId);
if (!active) {
return;
}
applySessionSnapshot(snapshot);
} catch {
// Best effort — the next poll or SSE recovery can still heal state.
}
}

for (const delegateId of offlineDelegateSessionIds) {
try {
const snapshot = await fetchSessionDetail(delegateId);
if (!active) {
return;
}
applyDelegateSnapshot(delegateId, snapshot);
} catch {
// Best effort — live SSE recovery or the next poll can still heal state.
}
}
};

const timer = setInterval(() => {
void pollFallbackSnapshots();
}, FALLBACK_POLL_INTERVAL_MS);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fallback polls can overlap and apply snapshots out of order.

setInterval() keeps firing even if the previous pollFallbackSnapshots() round is still awaiting network I/O. If one round takes longer than 2s, a slower response can overwrite newer session/delegate state while also doubling backend load. Add an in-flight guard or switch to recursive setTimeout() so only one poll round runs at a time.

🔁 Suggested guard
     let active = true;
+    let pollInFlight = false;
+
     const pollFallbackSnapshots = async () => {
+      if (pollInFlight) {
+        return;
+      }
+      pollInFlight = true;
+      try {
       if (shouldPollSession && sessionId) {
         try {
           const snapshot = await fetchSessionDetail(sessionId);
@@
         } catch {
           // Best effort — live SSE recovery or the next poll can still heal state.
         }
       }
+      } finally {
+        pollInFlight = false;
+      }
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/features/agent/hooks/useAgentChat.ts` around lines 291 - 320,
pollFallbackSnapshots can overlap because setInterval fires regardless of
in-flight work; add a single-run guard so only one poll runs at a time and
ensure timers are cancelled on cleanup. Implement either: (a) an inFlight
boolean (e.g., isPolling) checked at the top of pollFallbackSnapshots, set true
before doing fetchSessionDetail/fetchSessionDetail for delegateIds and set false
in a finally block so applySessionSnapshot/applyDelegateSnapshot are only called
from the active runner, or (b) replace setInterval(timer) with a recursive
setTimeout that schedules the next run only after the current run completes; in
both cases ensure you clear the timer and set active = false on unmount/cleanup
so no stray polls run.

Comment on lines +216 to +225
topicEntry.subscribers.set(subscriberId, callbacks);
callbacks.onStateChange(conn.state);
if (topicEntry.lastPayload !== null) {
callbacks.onData(topicEntry.lastPayload);
}

if (isNewTopic) {
this.handleTopicAdded(conn, topic);
} else {
this.ensureConnected(conn);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fresh topics are marked live before the server subscribes them.

Line 217 replays the shared connection state to a brand-new topic before handleTopicAdded() has moved it from pendingAdd into serverTopics. For delegate panes, useSSEConnection treats that as “live” and disables fallback polling even though the topic is still pending or may later be rejected. Please emit topic-level state here (serverTopics.has(topic) / pending add) and only flip the topic to connected after the control or mutation ack arrives.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ui/src/hooks/SSEManager.ts` around lines 216 - 225, The code replays the
connection state to a new topic immediately (topicEntry.subscribers.set(...);
callbacks.onStateChange(conn.state)) before the topic is moved from pendingAdd
into serverTopics, causing delegate panes to treat it as live; change this so
you emit topic-level state based on whether serverTopics.has(topic) or the
pendingAdd flag (i.e., report a pending/not-connected state instead of
conn.state for fresh topics) and defer flipping the topic to connected until
handleTopicAdded confirms the server subscription (or until the control/mutation
ack arrives); update the logic around topicEntry, callbacks.onStateChange,
handleTopicAdded and ensureConnected so handleTopicAdded performs the final
callbacks.onStateChange(connected) when the topic is actually moved into
serverTopics.

@yohamta0 yohamta0 merged commit 438bb29 into main Mar 14, 2026
6 checks passed
@yohamta0 yohamta0 deleted the sse-hardening branch March 14, 2026 03:07
@codecov

codecov Bot commented Mar 14, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 64.07186% with 60 lines in your changes missing coverage. Please review.
✅ Project coverage is 66.81%. Comparing base (88a9f29) to head (3806a98).
⚠️ Report is 14 commits behind head on main.

Files with missing lines Patch % Lines
internal/agent/api.go 48.80% 23 Missing and 20 partials ⚠️
internal/cmn/config/loader.go 71.42% 3 Missing and 5 partials ⚠️
internal/cmn/config/config.go 45.45% 2 Missing and 4 partials ⚠️
internal/agent/session.go 93.18% 0 Missing and 3 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1768      +/-   ##
==========================================
- Coverage   67.99%   66.81%   -1.19%     
==========================================
  Files         402      402              
  Lines       45547    45808     +261     
==========================================
- Hits        30969    30605     -364     
- Misses      11379    11556     +177     
- Partials     3199     3647     +448     
Files with missing lines Coverage Δ
internal/agent/session.go 71.01% <93.18%> (-13.04%) ⬇️
internal/cmn/config/config.go 48.87% <45.45%> (+1.60%) ⬆️
internal/cmn/config/loader.go 67.45% <71.42%> (-1.68%) ⬇️
internal/agent/api.go 53.65% <48.80%> (-19.18%) ⬇️

... and 12 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 88a9f29...3806a98. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant