Skip to content

v0.6.2.0 feat(assistant): SSE streaming on POST /messages (#146)#151

Merged
jayzalowitz merged 1 commit into
mainfrom
jayzalowitz/assistant-sse-streaming
May 5, 2026
Merged

v0.6.2.0 feat(assistant): SSE streaming on POST /messages (#146)#151
jayzalowitz merged 1 commit into
mainfrom
jayzalowitz/assistant-sse-streaming

Conversation

@jayzalowitz

Copy link
Copy Markdown
Owner

Summary

Closes #146 (assistant phase 2a). The chat UI now streams replies token-by-token instead of blocking on the full LLM response. Replaces the 3-10s wait + sudden full-message paint with a Claude/ChatGPT-style live typing animation backed by real content.

Anthropic ships native SSE; OpenAI / Google / Ollama fall back to single-chunk emission of the existing sync response so the API contract is uniform regardless of which provider is in front. Backward-compatible: legacy JSON callers (no Accept: text/event-stream) still work unchanged.

What landed

LlmClient.generateStream

```ts
async *generateStream(prompt, options): AsyncIterable
```

Yields chunk events as text arrives, then exactly one done event with the assembled full content + provider/model/latency.

Provider-chain semantics differ slightly from sync generate() and the difference is load-bearing:

  • Pre-first-chunk failures fall through to the next provider (same as sync). No partial output has been emitted yet, so silent retry is safe.
  • Mid-stream failures do NOT fall through. Once a provider has committed by yielding even one chunk, the user has already seen text — silently retrying a different provider would produce duplicate output.

Native Anthropic streaming

packages/llm-client/src/providers/anthropic.ts:streamGenerate consumes /v1/messages with stream: true. Buffers events across reader.read() boundaries (a chunk can split mid-event). Tolerates the full Anthropic event taxonomy by ignoring everything except content_block_delta events with delta.type === 'text_delta'.

The parser is exported for unit tests so we don't have to mock fetch + Response for every behavior.

Universal fallback for non-streaming providers

makeFallbackStream wraps the sync generate as a single-chunk async iterable. Lets the chain expose a uniform streaming interface even for providers we haven't implemented native streaming for yet. Adding native streaming for OpenAI / Google / Ollama is just dropping a new streamGenerate into their provider module — no changes elsewhere.

AssistantService.replyStream

Same EnrichmentRequest semantics as reply(): when supplied AND a ContextBuilder is wired, the rendered twin/memory context block is prepended to the system prompt for this request.

Three terminal events:

  • done (success) — fullContent + metadata
  • error (mid-stream failure with at least one chunk landed) — partialContent + message
  • Pre-first-chunk failures escape via throw — caller turns into HTTP 502 (same as sync reply())

POST /api/assistant/messages SSE branch

Branches on the Accept header. text/event-stream triggers streaming; everything else falls through to the existing sync JSON response.

Wire format:

```
event: thread
data: {"id":"...","isNew":true}

event: user
data: {...userMessage row...}

event: chunk
data: {"content":"Hello"}

event: chunk
data: {"content":" world"}

event: done
data: {...assistantMessage row...}
```

User message persists FIRST (before the LLM call) so it survives a provider outage. Assistant message persists AFTER the stream closes, using the accumulated full content. If the persist fails, done still fires with persistFailed: true so the client can warn — the audit-trail loss is recoverable, the user-facing regression isn't.

X-Accel-Buffering: no keeps nginx from buffering end-to-end. Mid-flight disconnect detection via res.writableEnded so the loop exits cleanly when the user navigates away — saves the provider's tokens.

Web client streams progressively

sendAssistantMessageStream uses fetch + manual SSE parsing (not EventSource — that's GET-only). Per-event callbacks (onThread, onUserMessage, onChunk, onDone, onError).

pages/assistant.js:handleSend updates a single bubble's text in place via data-streaming-id direct DOM update — no full repaint per token. Preserves textarea focus and scroll position during the stream. Typing dots fire only while sending AND before the first chunk; once the streaming bubble appears it shows the live text.

Mid-stream errors render the partial content + an error caveat in separate bubbles so the user sees both what landed and what went wrong.

Safety / invariants

  • Trust boundary — Gmail / sender / user strings still flow through escapeHtml before landing in innerHTML. The streaming bubble's incremental DOM updates use textContent (not innerHTML), which is XSS-safe by construction.
  • Provider commit — once a provider yields a chunk, the chain refuses to silently retry a different provider. Comment-documented at LlmClient.generateStream and tested.
  • Audit trail integrity — assistant message persists after stream closes; persist-failure surfaces persistFailed: true to the client rather than silently losing the row.

Test plan

  • pnpm build — 20 packages green
  • pnpm test — 40 packages green; 24 new tests on top of the existing assistant + llm-client suites
  • pnpm lint — clean
  • Manual: configure Anthropic provider, navigate to #/assistant, send a long prompt, watch tokens land progressively
  • Manual: configure OpenAI-only, verify single-chunk fallback still works (whole reply lands at once)
  • Manual: with both providers configured, kill network mid-stream, verify partial content + error caveat render
  • Manual: with no AI provider configured, verify the streaming endpoint returns 409 cleanly (pre-stream check)

Phase 2+ still in flight

🤖 Generated with Claude Code

Phase 2a of issue #135closes #146. The assistant chat UI now streams
replies token-by-token instead of blocking on the full LLM response.
Anthropic ships native SSE; OpenAI/Google/Ollama fall back to single-chunk
emission so the API contract is uniform regardless of which provider is
in front. Backward-compatible: legacy JSON callers still work unchanged.

LlmClient.generateStream
- AsyncIterable<LlmStreamEvent> with chunk + done events
- Pre-first-chunk failures fall through to next provider (same as sync)
- Mid-stream failures re-throw — once a provider committed by yielding
  text, silent retry would produce duplicate output
- Throws AllProvidersFailedError when no provider yields any chunk

Native Anthropic SSE
- /v1/messages with stream:true + Accept:text/event-stream
- Buffers events across reader.read() boundaries
- Tolerant of full event taxonomy; only forwards content_block_delta
  text_deltas, ignores message_start/ping/etc.
- Parser exported for unit testing

Universal fallback (makeFallbackStream) wraps sync generate as a
single-chunk async iterable so non-streaming providers get the same
caller contract without a real streaming implementation.

AssistantService.replyStream — same enrichment path as reply(); yields
chunk events then terminal done OR error event (mid-stream errors carry
partialContent for graceful UI degradation).

POST /api/assistant/messages branches on Accept: text/event-stream.
Wire format: thread + user events first, then chunk events, then
done (with persisted assistantMessage row) OR error (with partial).
User message persists BEFORE LLM call (provider outage survival);
assistant message persists AFTER stream closes. Mid-flight client
disconnect detected via res.writableEnded.

Web client: sendAssistantMessageStream uses fetch + manual SSE parser
(EventSource is GET-only). Page handler updates a single bubble's text
in place via data-streaming-id — no full repaint per token, preserves
focus and scroll position.

Tests: 24 new (7 SSE parser + 6 generateStream chain + 5 replyStream
+ 6 reused/refactored). Full suite green across 40 packages.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 5, 2026 19:23
@jayzalowitz jayzalowitz merged commit 09f621c into main May 5, 2026
10 checks passed

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements end-to-end SSE streaming for assistant replies across the LLM client, assistant service, API route, and web chat UI, while preserving the existing non-streaming JSON path for legacy callers.

Changes:

  • Added LlmClient.generateStream, Anthropic native SSE parsing, and fallback single-chunk streaming for non-native providers.
  • Added AssistantService.replyStream plus an SSE branch on POST /api/assistant/messages to stream thread / user / chunk / terminal events.
  • Updated the web assistant client to consume POST-based SSE and render one assistant bubble incrementally; added unit tests and release notes.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
VERSION Bumps release to 0.6.2.0.
packages/llm-client/src/types.ts Adds stream event/provider stream type definitions.
packages/llm-client/src/providers/anthropic.ts Adds Anthropic streaming request path and SSE parser.
packages/llm-client/src/llm-client.ts Adds provider-chain generateStream and fallback stream wrappers.
packages/llm-client/src/index.ts Re-exports LlmStreamEvent.
packages/llm-client/src/tests/llm-client.test.ts Adds streaming/failover unit coverage for LlmClient.
packages/llm-client/src/tests/anthropic-sse.test.ts Adds parser-focused Anthropic SSE tests.
packages/assistant/src/index.ts Exports assistant streaming types.
packages/assistant/src/assistant-service.ts Adds replyStream and assistant-level stream event shaping.
packages/assistant/src/tests/assistant-service.test.ts Adds streaming service unit tests.
CHANGELOG.md Documents the streaming assistant release.
apps/web/public/js/pages/assistant.js Updates the assistant page to stream and paint replies progressively.
apps/web/public/js/api-client.js Adds POST SSE client/parsing for assistant messages.
apps/api/src/routes/assistant.ts Adds SSE response path and streaming persistence flow for assistant messages.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +427 to +432
onDone: (assistantMessage) => {
// Replace the streaming bubble with the persisted one.
_state.messages = _state.messages
.filter((m) => m.id !== streamingAssistantId)
.concat([assistantMessage]);
if (container) paint(container);
Comment on lines +457 to +458
if (input && !partialContent) input.value = content;
if (container) paint(container);
Comment on lines +216 to +221
res.status(200);
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders?.();
Comment on lines +71 to +75
// Streaming requests can take longer than sync ones (the model is still
// generating while we read), but a hung connection still needs to time
// out — use 2x the sync default. Caller-provided timeoutMs wins.
const timeout = setTimeout(() => controller.abort(), options.timeoutMs ?? 60_000);

Comment on lines +104 to +107
yield* parseAnthropicSseStream(res.body);
} finally {
clearTimeout(timeout);
}
Comment on lines +175 to +182
for (const line of rawEvent.split('\n')) {
if (!line.startsWith('data:')) continue;
const payload = line.slice(5).trim();
if (!payload || payload === '[DONE]') continue;
try {
const parsed = JSON.parse(payload) as AnthropicDeltaEvent;
if (parsed.type === 'content_block_delta' && parsed.delta?.type === 'text_delta') {
return parsed.delta.text ?? '';
Comment on lines +432 to +443
if (wantsStream) {
await streamAssistantReply({
service,
history,
enrichment: { userId, query: content },
threadId,
isNewThread,
userMessage,
res,
log,
});
return;
Comment on lines 472 to +478
} catch (err) {
// Restore the input so the user can retry, drop the optimistic bubble,
// and surface the error in an assistant-shaped bubble. Phase 2 should
// make this a proper toast.
_state.messages = _state.messages.filter((m) => m.id !== 'optimistic');
// Transport-level failure (network down, 4xx/5xx pre-stream). Drop
// the optimistic bubble, restore input, surface error in an
// assistant-shaped bubble.
_state.messages = _state.messages.filter(
(m) => m.id !== 'optimistic' && m.id !== streamingAssistantId,
);
Comment on lines +418 to +423
// Bubble missing (page navigated away mid-stream and came
// back?) — fall back to a full re-paint so state stays
// consistent.
const idx = _state.messages.findIndex((m) => m.id === streamingAssistantId);
if (idx >= 0) _state.messages[idx].content = streamingContent;
if (container) paint(container);
Comment on lines +397 to +408
if (!receivedFirstChunk) {
// First chunk: insert the streaming bubble + drop the typing dots.
receivedFirstChunk = true;
_state.messages = _state.messages.concat([
{
id: streamingAssistantId,
role: 'assistant',
content: streamingContent,
createdAt: new Date().toISOString(),
},
]);
if (container) paint(container);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Assistant phase 2a: SSE streaming on POST /api/assistant/messages

2 participants