fix(backend): stream DeerFlowClient AI text as token deltas (#1969)#1974
Conversation
DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values",
"custom"] which only delivers full-state snapshots at graph-node
boundaries, so AI replies were dumped as a single messages-tuple event
per node instead of streaming token-by-token. `client.stream("hello")`
looked identical to `client.chat("hello")` — the bug reported in #1969.
Subscribe to "messages" mode as well, forward AIMessageChunk deltas as
messages-tuple events with delta semantics (consumers accumulate by id),
and dedup the values-snapshot path so it does not re-synthesize AI
text that was already streamed. Introduce a per-id usage_metadata
counter so the final AIMessage in the values snapshot and the final
"messages" chunk — which carry the same cumulative usage — are not
double-counted.
chat() now accumulates per-id deltas and returns the last message's
full accumulated text. Non-streaming mock sources (single event per id)
are a degenerate case of the same logic, keeping existing callers and
tests backward compatible.
Verified end-to-end against a real LLM: a 15-number count emits 35
messages-tuple events with BPE subword boundaries clearly visible
("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across
the window, end-event usage matches the values-snapshot usage exactly
(not doubled). tests/test_client_live.py::TestLiveStreaming passes.
New unit tests:
- test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3
delta events with correct content/id/usage, values-snapshot does not
duplicate, usage counted once.
- test_chat_accumulates_streamed_deltas: chat() rebuilds full text
from deltas.
- test_messages_mode_tool_message: ToolMessage delivered via messages
mode is not duplicated by the values-snapshot synthesis path.
The stream() docstring now documents why this client does not reuse
Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw
LangChain objects vs serialized dicts, single caller vs HTTP fan-out).
Fixes #1969
There was a problem hiding this comment.
Pull request overview
This PR updates the embedded DeerFlowClient conversation APIs to support true token-level streaming by subscribing to LangGraph "messages" mode and emitting per-chunk AI text deltas (while avoiding duplication from the "values" snapshot path).
Changes:
- Subscribe to LangGraph
stream_mode=["values", "messages", "custom"]and forward"messages"chunks asmessages-tupledelta events. - Deduplicate
values-snapshot synthesis for message IDs already streamed via"messages", and prevent double-countingusage_metadataacross modes. - Update
chat()to accumulate per-message-id deltas and return the final accumulated AI text; add/extend unit tests.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
backend/packages/harness/deerflow/client.py |
Adds "messages" stream subscription, delta forwarding, usage dedup, and chat() delta accumulation; expands stream() docstring. |
backend/tests/test_client.py |
Adds regression tests validating token-delta emission, values-path dedup, usage dedup, and chat() accumulation. |
backend/CLAUDE.md |
Updates embedded client documentation to reflect delta semantics and usage dedup behavior. |
Post-review cleanup for the token-level streaming fix. No behavior change for correct inputs; one efficiency regression fixed. Fix: chat() O(n²) accumulator ----------------------------- `chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`, which is O(n) per concat → O(n²) total over a streamed response. At ~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks it costs roughly 100-300 ms of pure copying. Switched to `dict[str, list[str]]` + `"".join()` once at return. Cleanup ------- - Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`, and `_tool_message_event` static helpers. The messages-mode and values-mode branches previously repeated four inline dict literals each; they now call the same builders. - `StreamEvent.type` is now typed as `Literal["values", "messages-tuple", "custom", "end"]` via a `StreamEventType` alias. Makes the closed set explicit and catches typos at type-check time. - Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`, `.tool_calls`, `.id` all have default values on the base class, so the `getattr(..., None)` fallbacks were dead code. Removed from the hot path. - `_account_usage` parameter type loosened to `Any` so that LangChain's `UsageMetadata` TypedDict is accepted under strict type checking. - Trimmed narrating comments on `seen_ids` / `streamed_ids` / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant. Net diff: -15 lines. All 132 unit tests + harness boundary test still pass; ruff check and ruff format pass.
Dedicated design document for the token-level streaming architecture, prompted by the bug investigation in #1969. Contents: - Why two parallel streaming paths exist (Gateway HTTP/async vs DeerFlowClient sync/in-process) and why they cannot be merged. - LangGraph's three-layer mode naming (Graph "messages" vs Platform SDK "messages-tuple" vs HTTP SSE) and why a shared string constant would be harmful. - Gateway path: run_agent + StreamBridge + sse_consumer with a sequence diagram. - DeerFlowClient path: sync generator + direct yield, delta semantics, chat() accumulator. - Why the three id sets (seen_ids / streamed_ids / counted_usage_ids) each carry an independent invariant and cannot be collapsed. - End-to-end sequence for a real conversation turn. - Lessons from #1969: why mock-based tests missed the bug, why BPE subword boundaries in live output are the strongest correctness signal, and the regression test that locks it in. - Source code location index. Also: - Link from backend/CLAUDE.md Embedded Client section. - Link from backend/docs/README.md under Feature Documentation.
Three new tests in TestStream that lock the contract introduced by PR #1974 so any future refactor (sync->async migration, sharing a core with Gateway's run_agent, dedup strategy change) cannot silently change behavior. - test_dedup_requires_messages_before_values_invariant: canary that documents the order-dependence of cross-mode dedup. streamed_ids is populated only by the messages branch, so values-before-messages for the same id produces duplicate AI text events. Real LangGraph never inverts this order, but a refactor that does (or that makes dedup idempotent) must update this test deliberately. - test_messages_mode_golden_event_sequence: locks the *exact* event sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1 end) for a canonical streaming turn. List equality gives a clear diff on any drift in order, type, or payload shape. - test_chat_accumulates_in_linear_time: perf canary for the O(n^2) fix in commit 1f11ba1. 10,000 single-char chunks must accumulate in under 1s; the threshold is wide enough to pass on slow CI but tight enough to fail if buffer = buffer + delta is restored. All three tests pass alongside the existing 12 TestStream tests (15/15). ruff check + ruff format clean.
Replace the misleading "raw LangChain objects (AIMessage, usage_metadata as dataclasses), not dicts" claim in the "Why not reuse Gateway's run_agent?" section. The implementation already yields plain Python dicts (StreamEvent.data is dict, and usage_metadata is a TypedDict), so the original wording suggested a richer return type than the API actually delivers. The corrected wording focuses on what is actually true and relevant: this client skips the JSON/SSE serialization layer that Gateway adds for HTTP wire transmission, and yields stream event payloads directly as Python data structures. Addresses Copilot review feedback on PR #1974.
Add test_none_id_chunks_produce_duplicates_known_limitation to
TestStream that explicitly documents and asserts the current
behavior when an LLM provider emits AIMessageChunk with id=None
(vLLM, certain custom backends).
The cross-mode dedup machinery cannot record a None id in
streamed_ids (guarded by ``if msg_id:``), so the values snapshot's
reassembled AIMessage with a real id falls through and synthesizes
a duplicate AI text event. The test asserts len == 2 and locks
this as a known limitation rather than silently letting future
contributors hit it without context.
Why this is documented rather than fixed:
* Falling back to ``metadata.get("id")`` does not help — LangGraph's
messages-mode metadata never carries the message id.
* Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the
values snapshot uses the same fallback, which it does not.
* A real fix requires provider cooperation (always emit chunk ids)
or content-based dedup (false-positive risk), neither of which
belongs in this PR.
If a real fix lands, replace this test with a positive assertion
that dedup works for None-id chunks.
Addresses Copilot review feedback on PR #1974 (client.py:515).
|
_tool_message_event still uses getattr while the commit message says they were removed The commit message says "Replaced dead getattr fallbacks with direct attribute access on the hot path." But ToolMessage fields like name, tool_call_id, and id are indeed always present on ToolMessage (they're |
|
Good catch — you're right that the same rationale applies here. |
…ded colors (#1942) - Fix `font-norma` typo to `font-normal` in message-list subtask count - Fix dark mode `--border` using reddish hue (22.216) instead of neutral - Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground` - Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground` - Add missing `font-sans` to welcome description `<pre>` for consistency - Make case-study-section padding responsive (`px-4 md:px-20`) Closes #1940
…ory requests (#1960) After history.replaceState updates the URL from /chats/new to /chats/{UUID}, Next.js useParams does not update because replaceState bypasses the router. The useEffect in useThreadChat would then set threadIdFromPath ('new') as the threadId, causing the LangGraph SDK to call POST /threads/new/history which returns HTTP 422 (Invalid thread ID: must be a UUID). This fix adds a guard to skip the threadId update when threadIdFromPath is the literal string 'new', preserving the already-correct UUID that was set when the thread was created.
Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>
* Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
…e#1969) (bytedance#1974) * fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969) DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values", "custom"] which only delivers full-state snapshots at graph-node boundaries, so AI replies were dumped as a single messages-tuple event per node instead of streaming token-by-token. `client.stream("hello")` looked identical to `client.chat("hello")` — the bug reported in bytedance#1969. Subscribe to "messages" mode as well, forward AIMessageChunk deltas as messages-tuple events with delta semantics (consumers accumulate by id), and dedup the values-snapshot path so it does not re-synthesize AI text that was already streamed. Introduce a per-id usage_metadata counter so the final AIMessage in the values snapshot and the final "messages" chunk — which carry the same cumulative usage — are not double-counted. chat() now accumulates per-id deltas and returns the last message's full accumulated text. Non-streaming mock sources (single event per id) are a degenerate case of the same logic, keeping existing callers and tests backward compatible. Verified end-to-end against a real LLM: a 15-number count emits 35 messages-tuple events with BPE subword boundaries clearly visible ("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across the window, end-event usage matches the values-snapshot usage exactly (not doubled). tests/test_client_live.py::TestLiveStreaming passes. New unit tests: - test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once. - test_chat_accumulates_streamed_deltas: chat() rebuilds full text from deltas. - test_messages_mode_tool_message: ToolMessage delivered via messages mode is not duplicated by the values-snapshot synthesis path. The stream() docstring now documents why this client does not reuse Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw LangChain objects vs serialized dicts, single caller vs HTTP fan-out). Fixes bytedance#1969 * refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969) Post-review cleanup for the token-level streaming fix. No behavior change for correct inputs; one efficiency regression fixed. Fix: chat() O(n²) accumulator ----------------------------- `chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`, which is O(n) per concat → O(n²) total over a streamed response. At ~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks it costs roughly 100-300 ms of pure copying. Switched to `dict[str, list[str]]` + `"".join()` once at return. Cleanup ------- - Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`, and `_tool_message_event` static helpers. The messages-mode and values-mode branches previously repeated four inline dict literals each; they now call the same builders. - `StreamEvent.type` is now typed as `Literal["values", "messages-tuple", "custom", "end"]` via a `StreamEventType` alias. Makes the closed set explicit and catches typos at type-check time. - Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`, `.tool_calls`, `.id` all have default values on the base class, so the `getattr(..., None)` fallbacks were dead code. Removed from the hot path. - `_account_usage` parameter type loosened to `Any` so that LangChain's `UsageMetadata` TypedDict is accepted under strict type checking. - Trimmed narrating comments on `seen_ids` / `streamed_ids` / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant. Net diff: -15 lines. All 132 unit tests + harness boundary test still pass; ruff check and ruff format pass. * docs(backend): add STREAMING.md design note (bytedance#1969) Dedicated design document for the token-level streaming architecture, prompted by the bug investigation in bytedance#1969. Contents: - Why two parallel streaming paths exist (Gateway HTTP/async vs DeerFlowClient sync/in-process) and why they cannot be merged. - LangGraph's three-layer mode naming (Graph "messages" vs Platform SDK "messages-tuple" vs HTTP SSE) and why a shared string constant would be harmful. - Gateway path: run_agent + StreamBridge + sse_consumer with a sequence diagram. - DeerFlowClient path: sync generator + direct yield, delta semantics, chat() accumulator. - Why the three id sets (seen_ids / streamed_ids / counted_usage_ids) each carry an independent invariant and cannot be collapsed. - End-to-end sequence for a real conversation turn. - Lessons from bytedance#1969: why mock-based tests missed the bug, why BPE subword boundaries in live output are the strongest correctness signal, and the regression test that locks it in. - Source code location index. Also: - Link from backend/CLAUDE.md Embedded Client section. - Link from backend/docs/README.md under Feature Documentation. * test(backend): add refactor regression guards for stream() (bytedance#1969) Three new tests in TestStream that lock the contract introduced by PR bytedance#1974 so any future refactor (sync->async migration, sharing a core with Gateway's run_agent, dedup strategy change) cannot silently change behavior. - test_dedup_requires_messages_before_values_invariant: canary that documents the order-dependence of cross-mode dedup. streamed_ids is populated only by the messages branch, so values-before-messages for the same id produces duplicate AI text events. Real LangGraph never inverts this order, but a refactor that does (or that makes dedup idempotent) must update this test deliberately. - test_messages_mode_golden_event_sequence: locks the *exact* event sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1 end) for a canonical streaming turn. List equality gives a clear diff on any drift in order, type, or payload shape. - test_chat_accumulates_in_linear_time: perf canary for the O(n^2) fix in commit 1f11ba1. 10,000 single-char chunks must accumulate in under 1s; the threshold is wide enough to pass on slow CI but tight enough to fail if buffer = buffer + delta is restored. All three tests pass alongside the existing 12 TestStream tests (15/15). ruff check + ruff format clean. * docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969) Replace the misleading "raw LangChain objects (AIMessage, usage_metadata as dataclasses), not dicts" claim in the "Why not reuse Gateway's run_agent?" section. The implementation already yields plain Python dicts (StreamEvent.data is dict, and usage_metadata is a TypedDict), so the original wording suggested a richer return type than the API actually delivers. The corrected wording focuses on what is actually true and relevant: this client skips the JSON/SSE serialization layer that Gateway adds for HTTP wire transmission, and yields stream event payloads directly as Python data structures. Addresses Copilot review feedback on PR bytedance#1974. * test(backend): document none-id messages dedup limitation (bytedance#1969) Add test_none_id_chunks_produce_duplicates_known_limitation to TestStream that explicitly documents and asserts the current behavior when an LLM provider emits AIMessageChunk with id=None (vLLM, certain custom backends). The cross-mode dedup machinery cannot record a None id in streamed_ids (guarded by ``if msg_id:``), so the values snapshot's reassembled AIMessage with a real id falls through and synthesizes a duplicate AI text event. The test asserts len == 2 and locks this as a known limitation rather than silently letting future contributors hit it without context. Why this is documented rather than fixed: * Falling back to ``metadata.get("id")`` does not help — LangGraph's messages-mode metadata never carries the message id. * Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the values snapshot uses the same fallback, which it does not. * A real fix requires provider cooperation (always emit chunk ids) or content-based dedup (false-positive risk), neither of which belongs in this PR. If a real fix lands, replace this test with a positive assertion that dedup works for None-id chunks. Addresses Copilot review feedback on PR bytedance#1974 (client.py:515). * fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942) - Fix `font-norma` typo to `font-normal` in message-list subtask count - Fix dark mode `--border` using reddish hue (22.216) instead of neutral - Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground` - Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground` - Add missing `font-sans` to welcome description `<pre>` for consistency - Make case-study-section padding responsive (`px-4 md:px-20`) Closes bytedance#1940 * docs: clarify deployment sizing guidance (bytedance#1963) * fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960) After history.replaceState updates the URL from /chats/new to /chats/{UUID}, Next.js useParams does not update because replaceState bypasses the router. The useEffect in useThreadChat would then set threadIdFromPath ('new') as the threadId, causing the LangGraph SDK to call POST /threads/new/history which returns HTTP 422 (Invalid thread ID: must be a UUID). This fix adds a guard to skip the threadId update when threadIdFromPath is the literal string 'new', preserving the already-correct UUID that was set when the thread was created. * fix(frontend): avoid using route new as thread id (bytedance#1967) Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> * Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965) * Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(backend): remove dead getattr in _tool_message_event --------- Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com> Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Co-authored-by: 13ernkastel <LennonCMJ@live.com> Co-authored-by: siwuai <458372151@qq.com> Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com> Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
…e#1969) (bytedance#1974) * fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969) DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values", "custom"] which only delivers full-state snapshots at graph-node boundaries, so AI replies were dumped as a single messages-tuple event per node instead of streaming token-by-token. `client.stream("hello")` looked identical to `client.chat("hello")` — the bug reported in bytedance#1969. Subscribe to "messages" mode as well, forward AIMessageChunk deltas as messages-tuple events with delta semantics (consumers accumulate by id), and dedup the values-snapshot path so it does not re-synthesize AI text that was already streamed. Introduce a per-id usage_metadata counter so the final AIMessage in the values snapshot and the final "messages" chunk — which carry the same cumulative usage — are not double-counted. chat() now accumulates per-id deltas and returns the last message's full accumulated text. Non-streaming mock sources (single event per id) are a degenerate case of the same logic, keeping existing callers and tests backward compatible. Verified end-to-end against a real LLM: a 15-number count emits 35 messages-tuple events with BPE subword boundaries clearly visible ("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across the window, end-event usage matches the values-snapshot usage exactly (not doubled). tests/test_client_live.py::TestLiveStreaming passes. New unit tests: - test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once. - test_chat_accumulates_streamed_deltas: chat() rebuilds full text from deltas. - test_messages_mode_tool_message: ToolMessage delivered via messages mode is not duplicated by the values-snapshot synthesis path. The stream() docstring now documents why this client does not reuse Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw LangChain objects vs serialized dicts, single caller vs HTTP fan-out). Fixes bytedance#1969 * refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969) Post-review cleanup for the token-level streaming fix. No behavior change for correct inputs; one efficiency regression fixed. Fix: chat() O(n²) accumulator ----------------------------- `chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`, which is O(n) per concat → O(n²) total over a streamed response. At ~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks it costs roughly 100-300 ms of pure copying. Switched to `dict[str, list[str]]` + `"".join()` once at return. Cleanup ------- - Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`, and `_tool_message_event` static helpers. The messages-mode and values-mode branches previously repeated four inline dict literals each; they now call the same builders. - `StreamEvent.type` is now typed as `Literal["values", "messages-tuple", "custom", "end"]` via a `StreamEventType` alias. Makes the closed set explicit and catches typos at type-check time. - Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`, `.tool_calls`, `.id` all have default values on the base class, so the `getattr(..., None)` fallbacks were dead code. Removed from the hot path. - `_account_usage` parameter type loosened to `Any` so that LangChain's `UsageMetadata` TypedDict is accepted under strict type checking. - Trimmed narrating comments on `seen_ids` / `streamed_ids` / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant. Net diff: -15 lines. All 132 unit tests + harness boundary test still pass; ruff check and ruff format pass. * docs(backend): add STREAMING.md design note (bytedance#1969) Dedicated design document for the token-level streaming architecture, prompted by the bug investigation in bytedance#1969. Contents: - Why two parallel streaming paths exist (Gateway HTTP/async vs DeerFlowClient sync/in-process) and why they cannot be merged. - LangGraph's three-layer mode naming (Graph "messages" vs Platform SDK "messages-tuple" vs HTTP SSE) and why a shared string constant would be harmful. - Gateway path: run_agent + StreamBridge + sse_consumer with a sequence diagram. - DeerFlowClient path: sync generator + direct yield, delta semantics, chat() accumulator. - Why the three id sets (seen_ids / streamed_ids / counted_usage_ids) each carry an independent invariant and cannot be collapsed. - End-to-end sequence for a real conversation turn. - Lessons from bytedance#1969: why mock-based tests missed the bug, why BPE subword boundaries in live output are the strongest correctness signal, and the regression test that locks it in. - Source code location index. Also: - Link from backend/CLAUDE.md Embedded Client section. - Link from backend/docs/README.md under Feature Documentation. * test(backend): add refactor regression guards for stream() (bytedance#1969) Three new tests in TestStream that lock the contract introduced by PR bytedance#1974 so any future refactor (sync->async migration, sharing a core with Gateway's run_agent, dedup strategy change) cannot silently change behavior. - test_dedup_requires_messages_before_values_invariant: canary that documents the order-dependence of cross-mode dedup. streamed_ids is populated only by the messages branch, so values-before-messages for the same id produces duplicate AI text events. Real LangGraph never inverts this order, but a refactor that does (or that makes dedup idempotent) must update this test deliberately. - test_messages_mode_golden_event_sequence: locks the *exact* event sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1 end) for a canonical streaming turn. List equality gives a clear diff on any drift in order, type, or payload shape. - test_chat_accumulates_in_linear_time: perf canary for the O(n^2) fix in commit 1f11ba1. 10,000 single-char chunks must accumulate in under 1s; the threshold is wide enough to pass on slow CI but tight enough to fail if buffer = buffer + delta is restored. All three tests pass alongside the existing 12 TestStream tests (15/15). ruff check + ruff format clean. * docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969) Replace the misleading "raw LangChain objects (AIMessage, usage_metadata as dataclasses), not dicts" claim in the "Why not reuse Gateway's run_agent?" section. The implementation already yields plain Python dicts (StreamEvent.data is dict, and usage_metadata is a TypedDict), so the original wording suggested a richer return type than the API actually delivers. The corrected wording focuses on what is actually true and relevant: this client skips the JSON/SSE serialization layer that Gateway adds for HTTP wire transmission, and yields stream event payloads directly as Python data structures. Addresses Copilot review feedback on PR bytedance#1974. * test(backend): document none-id messages dedup limitation (bytedance#1969) Add test_none_id_chunks_produce_duplicates_known_limitation to TestStream that explicitly documents and asserts the current behavior when an LLM provider emits AIMessageChunk with id=None (vLLM, certain custom backends). The cross-mode dedup machinery cannot record a None id in streamed_ids (guarded by ``if msg_id:``), so the values snapshot's reassembled AIMessage with a real id falls through and synthesizes a duplicate AI text event. The test asserts len == 2 and locks this as a known limitation rather than silently letting future contributors hit it without context. Why this is documented rather than fixed: * Falling back to ``metadata.get("id")`` does not help — LangGraph's messages-mode metadata never carries the message id. * Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the values snapshot uses the same fallback, which it does not. * A real fix requires provider cooperation (always emit chunk ids) or content-based dedup (false-positive risk), neither of which belongs in this PR. If a real fix lands, replace this test with a positive assertion that dedup works for None-id chunks. Addresses Copilot review feedback on PR bytedance#1974 (client.py:515). * fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942) - Fix `font-norma` typo to `font-normal` in message-list subtask count - Fix dark mode `--border` using reddish hue (22.216) instead of neutral - Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground` - Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground` - Add missing `font-sans` to welcome description `<pre>` for consistency - Make case-study-section padding responsive (`px-4 md:px-20`) Closes bytedance#1940 * docs: clarify deployment sizing guidance (bytedance#1963) * fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960) After history.replaceState updates the URL from /chats/new to /chats/{UUID}, Next.js useParams does not update because replaceState bypasses the router. The useEffect in useThreadChat would then set threadIdFromPath ('new') as the threadId, causing the LangGraph SDK to call POST /threads/new/history which returns HTTP 422 (Invalid thread ID: must be a UUID). This fix adds a guard to skip the threadId update when threadIdFromPath is the literal string 'new', preserving the already-correct UUID that was set when the thread was created. * fix(frontend): avoid using route new as thread id (bytedance#1967) Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> * Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965) * Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(backend): remove dead getattr in _tool_message_event --------- Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com> Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Co-authored-by: 13ernkastel <LennonCMJ@live.com> Co-authored-by: siwuai <458372151@qq.com> Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com> Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
…e#1969) (bytedance#1974) * fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969) DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values", "custom"] which only delivers full-state snapshots at graph-node boundaries, so AI replies were dumped as a single messages-tuple event per node instead of streaming token-by-token. `client.stream("hello")` looked identical to `client.chat("hello")` — the bug reported in bytedance#1969. Subscribe to "messages" mode as well, forward AIMessageChunk deltas as messages-tuple events with delta semantics (consumers accumulate by id), and dedup the values-snapshot path so it does not re-synthesize AI text that was already streamed. Introduce a per-id usage_metadata counter so the final AIMessage in the values snapshot and the final "messages" chunk — which carry the same cumulative usage — are not double-counted. chat() now accumulates per-id deltas and returns the last message's full accumulated text. Non-streaming mock sources (single event per id) are a degenerate case of the same logic, keeping existing callers and tests backward compatible. Verified end-to-end against a real LLM: a 15-number count emits 35 messages-tuple events with BPE subword boundaries clearly visible ("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across the window, end-event usage matches the values-snapshot usage exactly (not doubled). tests/test_client_live.py::TestLiveStreaming passes. New unit tests: - test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once. - test_chat_accumulates_streamed_deltas: chat() rebuilds full text from deltas. - test_messages_mode_tool_message: ToolMessage delivered via messages mode is not duplicated by the values-snapshot synthesis path. The stream() docstring now documents why this client does not reuse Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw LangChain objects vs serialized dicts, single caller vs HTTP fan-out). Fixes bytedance#1969 * refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969) Post-review cleanup for the token-level streaming fix. No behavior change for correct inputs; one efficiency regression fixed. Fix: chat() O(n²) accumulator ----------------------------- `chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`, which is O(n) per concat → O(n²) total over a streamed response. At ~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks it costs roughly 100-300 ms of pure copying. Switched to `dict[str, list[str]]` + `"".join()` once at return. Cleanup ------- - Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`, and `_tool_message_event` static helpers. The messages-mode and values-mode branches previously repeated four inline dict literals each; they now call the same builders. - `StreamEvent.type` is now typed as `Literal["values", "messages-tuple", "custom", "end"]` via a `StreamEventType` alias. Makes the closed set explicit and catches typos at type-check time. - Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`, `.tool_calls`, `.id` all have default values on the base class, so the `getattr(..., None)` fallbacks were dead code. Removed from the hot path. - `_account_usage` parameter type loosened to `Any` so that LangChain's `UsageMetadata` TypedDict is accepted under strict type checking. - Trimmed narrating comments on `seen_ids` / `streamed_ids` / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant. Net diff: -15 lines. All 132 unit tests + harness boundary test still pass; ruff check and ruff format pass. * docs(backend): add STREAMING.md design note (bytedance#1969) Dedicated design document for the token-level streaming architecture, prompted by the bug investigation in bytedance#1969. Contents: - Why two parallel streaming paths exist (Gateway HTTP/async vs DeerFlowClient sync/in-process) and why they cannot be merged. - LangGraph's three-layer mode naming (Graph "messages" vs Platform SDK "messages-tuple" vs HTTP SSE) and why a shared string constant would be harmful. - Gateway path: run_agent + StreamBridge + sse_consumer with a sequence diagram. - DeerFlowClient path: sync generator + direct yield, delta semantics, chat() accumulator. - Why the three id sets (seen_ids / streamed_ids / counted_usage_ids) each carry an independent invariant and cannot be collapsed. - End-to-end sequence for a real conversation turn. - Lessons from bytedance#1969: why mock-based tests missed the bug, why BPE subword boundaries in live output are the strongest correctness signal, and the regression test that locks it in. - Source code location index. Also: - Link from backend/CLAUDE.md Embedded Client section. - Link from backend/docs/README.md under Feature Documentation. * test(backend): add refactor regression guards for stream() (bytedance#1969) Three new tests in TestStream that lock the contract introduced by PR bytedance#1974 so any future refactor (sync->async migration, sharing a core with Gateway's run_agent, dedup strategy change) cannot silently change behavior. - test_dedup_requires_messages_before_values_invariant: canary that documents the order-dependence of cross-mode dedup. streamed_ids is populated only by the messages branch, so values-before-messages for the same id produces duplicate AI text events. Real LangGraph never inverts this order, but a refactor that does (or that makes dedup idempotent) must update this test deliberately. - test_messages_mode_golden_event_sequence: locks the *exact* event sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1 end) for a canonical streaming turn. List equality gives a clear diff on any drift in order, type, or payload shape. - test_chat_accumulates_in_linear_time: perf canary for the O(n^2) fix in commit 1f11ba1. 10,000 single-char chunks must accumulate in under 1s; the threshold is wide enough to pass on slow CI but tight enough to fail if buffer = buffer + delta is restored. All three tests pass alongside the existing 12 TestStream tests (15/15). ruff check + ruff format clean. * docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969) Replace the misleading "raw LangChain objects (AIMessage, usage_metadata as dataclasses), not dicts" claim in the "Why not reuse Gateway's run_agent?" section. The implementation already yields plain Python dicts (StreamEvent.data is dict, and usage_metadata is a TypedDict), so the original wording suggested a richer return type than the API actually delivers. The corrected wording focuses on what is actually true and relevant: this client skips the JSON/SSE serialization layer that Gateway adds for HTTP wire transmission, and yields stream event payloads directly as Python data structures. Addresses Copilot review feedback on PR bytedance#1974. * test(backend): document none-id messages dedup limitation (bytedance#1969) Add test_none_id_chunks_produce_duplicates_known_limitation to TestStream that explicitly documents and asserts the current behavior when an LLM provider emits AIMessageChunk with id=None (vLLM, certain custom backends). The cross-mode dedup machinery cannot record a None id in streamed_ids (guarded by ``if msg_id:``), so the values snapshot's reassembled AIMessage with a real id falls through and synthesizes a duplicate AI text event. The test asserts len == 2 and locks this as a known limitation rather than silently letting future contributors hit it without context. Why this is documented rather than fixed: * Falling back to ``metadata.get("id")`` does not help — LangGraph's messages-mode metadata never carries the message id. * Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the values snapshot uses the same fallback, which it does not. * A real fix requires provider cooperation (always emit chunk ids) or content-based dedup (false-positive risk), neither of which belongs in this PR. If a real fix lands, replace this test with a positive assertion that dedup works for None-id chunks. Addresses Copilot review feedback on PR bytedance#1974 (client.py:515). * fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942) - Fix `font-norma` typo to `font-normal` in message-list subtask count - Fix dark mode `--border` using reddish hue (22.216) instead of neutral - Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground` - Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground` - Add missing `font-sans` to welcome description `<pre>` for consistency - Make case-study-section padding responsive (`px-4 md:px-20`) Closes bytedance#1940 * docs: clarify deployment sizing guidance (bytedance#1963) * fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960) After history.replaceState updates the URL from /chats/new to /chats/{UUID}, Next.js useParams does not update because replaceState bypasses the router. The useEffect in useThreadChat would then set threadIdFromPath ('new') as the threadId, causing the LangGraph SDK to call POST /threads/new/history which returns HTTP 422 (Invalid thread ID: must be a UUID). This fix adds a guard to skip the threadId update when threadIdFromPath is the literal string 'new', preserving the already-correct UUID that was set when the thread was created. * fix(frontend): avoid using route new as thread id (bytedance#1967) Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> * Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965) * Fix event loop conflict in SubagentExecutor.execute() When SubagentExecutor.execute() is called from within an already-running event loop (e.g., when the parent agent uses async/await), calling asyncio.run() creates a new event loop that conflicts with asyncio primitives (like httpx.AsyncClient) that were created in and bound to the parent loop. This fix detects if we're already in a running event loop, and if so, runs the subagent in a separate thread with its own isolated event loop to avoid conflicts. Fixes: sub-task cards not appearing in Ultra mode when using async parent agents Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(subagent): harden isolated event loop execution --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(backend): remove dead getattr in _tool_message_event --------- Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com> Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Co-authored-by: 13ernkastel <LennonCMJ@live.com> Co-authored-by: siwuai <458372151@qq.com> Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com> Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com> Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Summary
Commit 1 (
f3486bb3) — bug fix:DeerFlowClient.stream()only subscribed to LangGraphstream_mode=["values", "custom"], so AI replies arrived as a single dump per graph node instead of token deltas.client.stream("hello")looked identical toclient.chat("hello")."messages"mode, forwardAIMessageChunkdeltas asmessages-tupleevents (delta semantics, per-id accumulation), and dedup thevaluessnapshot path so it does not re-synthesize already-streamed text. Usage metadata is counted exactly once per message id to avoid double-counting the same cumulativeusagethat arrives via both the final messages-mode chunk and the values-snapshot's finalAIMessage.chat()accumulates per-id deltas and returns the last message's full accumulated text.Why not reuse Gateway's run_agent?section in thestream()docstring explaining that Gateway'srun_agent/StreamBridgepipeline is async HTTP SSE infrastructure and cannot be shared with a sync in-process generator without paying a larger cost than the duplication saves.Commit 2 (
1f11ba10) — post-review cleanup: a follow-up review pass (reuse / quality / efficiency) surfaced one real perf regression and several consolidation opportunities. Net-15lines.chat()O(n²) regression introduced by commit 1: replacedbuffers[id] = buffers.get(id,"") + deltawithdict[str, list[str]]+"".join(). Measured impact: a 50 KB / 5000-chunk response dropped from ~100-300 ms of pure string copying to millisecond-range._serialize_tool_calls+_ai_text_event/_ai_tool_calls_event/_tool_message_eventbuilder helpers. The messages-mode and values-mode branches previously repeated the same four inline dict literals; they now share builders. Messages branch: ~30 → ~18 lines. Values branch: ~25 → ~13 lines.StreamEvent.typeis nowLiteral["values", "messages-tuple", "custom", "end"]via aStreamEventTypealias — the closed set is explicit and typos are caught statically.getattr(msg_chunk, "tool_calls", None)/"usage_metadata"/"id"fallbacks with direct attribute access on the hot path (AIMessage base class fields always exist with defaults)._account_usageparameter loosened toAnyso LangChain'sUsageMetadataTypedDict passes strict type checking.seen_ids/streamed_ids/ the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant.Fixes #1969.
End-to-end verification (commit 1)
Verified against a real LLM. A 15-number count (
Count from 1 to 15, writing each number as an English word on its own line) emits 35messages-tupleevents across a 476 ms window with BPE subword boundaries clearly visible in the deltas:```
[5.209s] 'one' [5.460s] 'ele' / 'ven'
[5.252s] 'two' [5.508s] 'tw' / 'elve'
[5.252s] 'three' [5.568s] 'th' / 'irteen'
[5.309s] 'four' [5.623s] 'four' / 'teen'
... [5.677s] 'f' / 'if' / 'teen'
```
These subword splits are produced by the tokenizer and cannot be faked — they are only observable when data actually flows through the pipeline chunk by chunk. Before the fix, all 88 characters arrived as a single event at one timestamp.
endeventusagematches thevaluessnapshotusageexactly (output_tokens=175, not350), confirming the per-id dedup set prevents double-counting.Test plan
tests/test_client.py::TestStream::test_messages_mode_emits_token_deltas— 3AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted oncetests/test_client.py::TestStream::test_chat_accumulates_streamed_deltas—chat()rebuilds full text from deltas (now via"".join()— O(n))tests/test_client.py::TestStream::test_messages_mode_tool_message—ToolMessagevia messages mode not duplicated by values-snapshot synthesistests/test_client.pytests still pass (backward compatible for non-streaming mock sources)tests/test_harness_boundary.pypasses (no newapp.*imports from harness)tests/test_client_live.py::TestLiveBasicChatpasses (realchat()withHELLOand56responses)tests/test_client_live.py::TestLiveStreamingpasses (real streaming with usage dedup verified)ruff checkandruff format --checkpass on modified files after both commits