v0.6.2.0 feat(assistant): SSE streaming on POST /messages (#146)#151
Merged
Conversation
Phase 2a of issue #135 — closes #146. The assistant chat UI now streams replies token-by-token instead of blocking on the full LLM response. Anthropic ships native SSE; OpenAI/Google/Ollama fall back to single-chunk emission so the API contract is uniform regardless of which provider is in front. Backward-compatible: legacy JSON callers still work unchanged. LlmClient.generateStream - AsyncIterable<LlmStreamEvent> with chunk + done events - Pre-first-chunk failures fall through to next provider (same as sync) - Mid-stream failures re-throw — once a provider committed by yielding text, silent retry would produce duplicate output - Throws AllProvidersFailedError when no provider yields any chunk Native Anthropic SSE - /v1/messages with stream:true + Accept:text/event-stream - Buffers events across reader.read() boundaries - Tolerant of full event taxonomy; only forwards content_block_delta text_deltas, ignores message_start/ping/etc. - Parser exported for unit testing Universal fallback (makeFallbackStream) wraps sync generate as a single-chunk async iterable so non-streaming providers get the same caller contract without a real streaming implementation. AssistantService.replyStream — same enrichment path as reply(); yields chunk events then terminal done OR error event (mid-stream errors carry partialContent for graceful UI degradation). POST /api/assistant/messages branches on Accept: text/event-stream. Wire format: thread + user events first, then chunk events, then done (with persisted assistantMessage row) OR error (with partial). User message persists BEFORE LLM call (provider outage survival); assistant message persists AFTER stream closes. Mid-flight client disconnect detected via res.writableEnded. Web client: sendAssistantMessageStream uses fetch + manual SSE parser (EventSource is GET-only). Page handler updates a single bubble's text in place via data-streaming-id — no full repaint per token, preserves focus and scroll position. Tests: 24 new (7 SSE parser + 6 generateStream chain + 5 replyStream + 6 reused/refactored). Full suite green across 40 packages. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Implements end-to-end SSE streaming for assistant replies across the LLM client, assistant service, API route, and web chat UI, while preserving the existing non-streaming JSON path for legacy callers.
Changes:
- Added
LlmClient.generateStream, Anthropic native SSE parsing, and fallback single-chunk streaming for non-native providers. - Added
AssistantService.replyStreamplus an SSE branch onPOST /api/assistant/messagesto streamthread/user/chunk/ terminal events. - Updated the web assistant client to consume POST-based SSE and render one assistant bubble incrementally; added unit tests and release notes.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| VERSION | Bumps release to 0.6.2.0. |
| packages/llm-client/src/types.ts | Adds stream event/provider stream type definitions. |
| packages/llm-client/src/providers/anthropic.ts | Adds Anthropic streaming request path and SSE parser. |
| packages/llm-client/src/llm-client.ts | Adds provider-chain generateStream and fallback stream wrappers. |
| packages/llm-client/src/index.ts | Re-exports LlmStreamEvent. |
| packages/llm-client/src/tests/llm-client.test.ts | Adds streaming/failover unit coverage for LlmClient. |
| packages/llm-client/src/tests/anthropic-sse.test.ts | Adds parser-focused Anthropic SSE tests. |
| packages/assistant/src/index.ts | Exports assistant streaming types. |
| packages/assistant/src/assistant-service.ts | Adds replyStream and assistant-level stream event shaping. |
| packages/assistant/src/tests/assistant-service.test.ts | Adds streaming service unit tests. |
| CHANGELOG.md | Documents the streaming assistant release. |
| apps/web/public/js/pages/assistant.js | Updates the assistant page to stream and paint replies progressively. |
| apps/web/public/js/api-client.js | Adds POST SSE client/parsing for assistant messages. |
| apps/api/src/routes/assistant.ts | Adds SSE response path and streaming persistence flow for assistant messages. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+427
to
+432
| onDone: (assistantMessage) => { | ||
| // Replace the streaming bubble with the persisted one. | ||
| _state.messages = _state.messages | ||
| .filter((m) => m.id !== streamingAssistantId) | ||
| .concat([assistantMessage]); | ||
| if (container) paint(container); |
Comment on lines
+457
to
+458
| if (input && !partialContent) input.value = content; | ||
| if (container) paint(container); |
Comment on lines
+216
to
+221
| res.status(200); | ||
| res.setHeader('Content-Type', 'text/event-stream'); | ||
| res.setHeader('Cache-Control', 'no-cache'); | ||
| res.setHeader('Connection', 'keep-alive'); | ||
| res.setHeader('X-Accel-Buffering', 'no'); | ||
| res.flushHeaders?.(); |
Comment on lines
+71
to
+75
| // Streaming requests can take longer than sync ones (the model is still | ||
| // generating while we read), but a hung connection still needs to time | ||
| // out — use 2x the sync default. Caller-provided timeoutMs wins. | ||
| const timeout = setTimeout(() => controller.abort(), options.timeoutMs ?? 60_000); | ||
|
|
Comment on lines
+104
to
+107
| yield* parseAnthropicSseStream(res.body); | ||
| } finally { | ||
| clearTimeout(timeout); | ||
| } |
Comment on lines
+175
to
+182
| for (const line of rawEvent.split('\n')) { | ||
| if (!line.startsWith('data:')) continue; | ||
| const payload = line.slice(5).trim(); | ||
| if (!payload || payload === '[DONE]') continue; | ||
| try { | ||
| const parsed = JSON.parse(payload) as AnthropicDeltaEvent; | ||
| if (parsed.type === 'content_block_delta' && parsed.delta?.type === 'text_delta') { | ||
| return parsed.delta.text ?? ''; |
Comment on lines
+432
to
+443
| if (wantsStream) { | ||
| await streamAssistantReply({ | ||
| service, | ||
| history, | ||
| enrichment: { userId, query: content }, | ||
| threadId, | ||
| isNewThread, | ||
| userMessage, | ||
| res, | ||
| log, | ||
| }); | ||
| return; |
Comment on lines
472
to
+478
| } catch (err) { | ||
| // Restore the input so the user can retry, drop the optimistic bubble, | ||
| // and surface the error in an assistant-shaped bubble. Phase 2 should | ||
| // make this a proper toast. | ||
| _state.messages = _state.messages.filter((m) => m.id !== 'optimistic'); | ||
| // Transport-level failure (network down, 4xx/5xx pre-stream). Drop | ||
| // the optimistic bubble, restore input, surface error in an | ||
| // assistant-shaped bubble. | ||
| _state.messages = _state.messages.filter( | ||
| (m) => m.id !== 'optimistic' && m.id !== streamingAssistantId, | ||
| ); |
Comment on lines
+418
to
+423
| // Bubble missing (page navigated away mid-stream and came | ||
| // back?) — fall back to a full re-paint so state stays | ||
| // consistent. | ||
| const idx = _state.messages.findIndex((m) => m.id === streamingAssistantId); | ||
| if (idx >= 0) _state.messages[idx].content = streamingContent; | ||
| if (container) paint(container); |
Comment on lines
+397
to
+408
| if (!receivedFirstChunk) { | ||
| // First chunk: insert the streaming bubble + drop the typing dots. | ||
| receivedFirstChunk = true; | ||
| _state.messages = _state.messages.concat([ | ||
| { | ||
| id: streamingAssistantId, | ||
| role: 'assistant', | ||
| content: streamingContent, | ||
| createdAt: new Date().toISOString(), | ||
| }, | ||
| ]); | ||
| if (container) paint(container); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Closes #146 (assistant phase 2a). The chat UI now streams replies token-by-token instead of blocking on the full LLM response. Replaces the 3-10s wait + sudden full-message paint with a Claude/ChatGPT-style live typing animation backed by real content.
Anthropic ships native SSE; OpenAI / Google / Ollama fall back to single-chunk emission of the existing sync response so the API contract is uniform regardless of which provider is in front. Backward-compatible: legacy JSON callers (no
Accept: text/event-stream) still work unchanged.What landed
LlmClient.generateStream```ts
async *generateStream(prompt, options): AsyncIterable
```
Yields
chunkevents as text arrives, then exactly onedoneevent with the assembled full content + provider/model/latency.Provider-chain semantics differ slightly from sync
generate()and the difference is load-bearing:Native Anthropic streaming
packages/llm-client/src/providers/anthropic.ts:streamGenerateconsumes/v1/messageswithstream: true. Buffers events acrossreader.read()boundaries (a chunk can split mid-event). Tolerates the full Anthropic event taxonomy by ignoring everything exceptcontent_block_deltaevents withdelta.type === 'text_delta'.The parser is exported for unit tests so we don't have to mock fetch + Response for every behavior.
Universal fallback for non-streaming providers
makeFallbackStreamwraps the syncgenerateas a single-chunk async iterable. Lets the chain expose a uniform streaming interface even for providers we haven't implemented native streaming for yet. Adding native streaming for OpenAI / Google / Ollama is just dropping a newstreamGenerateinto their provider module — no changes elsewhere.AssistantService.replyStreamSame
EnrichmentRequestsemantics asreply(): when supplied AND aContextBuilderis wired, the rendered twin/memory context block is prepended to the system prompt for this request.Three terminal events:
done(success) —fullContent+metadataerror(mid-stream failure with at least one chunk landed) —partialContent+messagereply())POST /api/assistant/messagesSSE branchBranches on the
Acceptheader.text/event-streamtriggers streaming; everything else falls through to the existing sync JSON response.Wire format:
```
event: thread
data: {"id":"...","isNew":true}
event: user
data: {...userMessage row...}
event: chunk
data: {"content":"Hello"}
event: chunk
data: {"content":" world"}
event: done
data: {...assistantMessage row...}
```
User message persists FIRST (before the LLM call) so it survives a provider outage. Assistant message persists AFTER the stream closes, using the accumulated full content. If the persist fails,
donestill fires withpersistFailed: trueso the client can warn — the audit-trail loss is recoverable, the user-facing regression isn't.X-Accel-Buffering: nokeeps nginx from buffering end-to-end. Mid-flight disconnect detection viares.writableEndedso the loop exits cleanly when the user navigates away — saves the provider's tokens.Web client streams progressively
sendAssistantMessageStreamusesfetch+ manual SSE parsing (notEventSource— that's GET-only). Per-event callbacks (onThread,onUserMessage,onChunk,onDone,onError).pages/assistant.js:handleSendupdates a single bubble's text in place viadata-streaming-iddirect DOM update — no full repaint per token. Preserves textarea focus and scroll position during the stream. Typing dots fire only while sending AND before the first chunk; once the streaming bubble appears it shows the live text.Mid-stream errors render the partial content + an error caveat in separate bubbles so the user sees both what landed and what went wrong.
Safety / invariants
escapeHtmlbefore landing ininnerHTML. The streaming bubble's incremental DOM updates usetextContent(notinnerHTML), which is XSS-safe by construction.LlmClient.generateStreamand tested.persistFailed: trueto the client rather than silently losing the row.Test plan
pnpm build— 20 packages greenpnpm test— 40 packages green; 24 new tests on top of the existing assistant + llm-client suitespnpm lint— clean#/assistant, send a long prompt, watch tokens land progressivelyPhase 2+ still in flight
LlmClientAPI (refactor prerequisite for Assistant phase 2c: action-intent routing through @skytwin/decision-engine #148 — drops theUser:/Assistant:prompt-flattening workaround)🤖 Generated with Claude Code