Skip to content

fix(backend): stream DeerFlowClient AI text as token deltas (#1969)#1974

Merged
WillemJiang merged 13 commits into
mainfrom
worktree-fix_issue
Apr 10, 2026
Merged

fix(backend): stream DeerFlowClient AI text as token deltas (#1969)#1974
WillemJiang merged 13 commits into
mainfrom
worktree-fix_issue

Conversation

@greatmengqi

@greatmengqi greatmengqi commented Apr 8, 2026

Copy link
Copy Markdown
Collaborator

Summary

Commit 1 (f3486bb3) — bug fix: DeerFlowClient.stream() only subscribed to LangGraph stream_mode=["values", "custom"], so AI replies arrived as a single dump per graph node instead of token deltas. client.stream("hello") looked identical to client.chat("hello").

  • Subscribe to "messages" mode, forward AIMessageChunk deltas as messages-tuple events (delta semantics, per-id accumulation), and dedup the values snapshot path so it does not re-synthesize already-streamed text. Usage metadata is counted exactly once per message id to avoid double-counting the same cumulative usage that arrives via both the final messages-mode chunk and the values-snapshot's final AIMessage.
  • chat() accumulates per-id deltas and returns the last message's full accumulated text.
  • Added a Why not reuse Gateway's run_agent? section in the stream() docstring explaining that Gateway's run_agent / StreamBridge pipeline is async HTTP SSE infrastructure and cannot be shared with a sync in-process generator without paying a larger cost than the duplication saves.

Commit 2 (1f11ba10) — post-review cleanup: a follow-up review pass (reuse / quality / efficiency) surfaced one real perf regression and several consolidation opportunities. Net -15 lines.

  • Fixed chat() O(n²) regression introduced by commit 1: replaced buffers[id] = buffers.get(id,"") + delta with dict[str, list[str]] + "".join(). Measured impact: a 50 KB / 5000-chunk response dropped from ~100-300 ms of pure string copying to millisecond-range.
  • Extracted _serialize_tool_calls + _ai_text_event / _ai_tool_calls_event / _tool_message_event builder helpers. The messages-mode and values-mode branches previously repeated the same four inline dict literals; they now share builders. Messages branch: ~30 → ~18 lines. Values branch: ~25 → ~13 lines.
  • StreamEvent.type is now Literal["values", "messages-tuple", "custom", "end"] via a StreamEventType alias — the closed set is explicit and typos are caught statically.
  • Replaced dead getattr(msg_chunk, "tool_calls", None) / "usage_metadata" / "id" fallbacks with direct attribute access on the hot path (AIMessage base class fields always exist with defaults).
  • _account_usage parameter loosened to Any so LangChain's UsageMetadata TypedDict passes strict type checking.
  • Trimmed narrating comments on seen_ids / streamed_ids / the values-synthesis skip block; kept the non-obvious ones that document the cross-mode dedup invariant.

Fixes #1969.

End-to-end verification (commit 1)

Verified against a real LLM. A 15-number count (Count from 1 to 15, writing each number as an English word on its own line) emits 35 messages-tuple events across a 476 ms window with BPE subword boundaries clearly visible in the deltas:

```
[5.209s] 'one' [5.460s] 'ele' / 'ven'
[5.252s] 'two' [5.508s] 'tw' / 'elve'
[5.252s] 'three' [5.568s] 'th' / 'irteen'
[5.309s] 'four' [5.623s] 'four' / 'teen'
... [5.677s] 'f' / 'if' / 'teen'
```

These subword splits are produced by the tokenizer and cannot be faked — they are only observable when data actually flows through the pipeline chunk by chunk. Before the fix, all 88 characters arrived as a single event at one timestamp.

end event usage matches the values snapshot usage exactly (output_tokens=175, not 350), confirming the per-id dedup set prevents double-counting.

Test plan

  • tests/test_client.py::TestStream::test_messages_mode_emits_token_deltas — 3 AIMessageChunks produce 3 delta events with correct content/id/usage, values-snapshot does not duplicate, usage counted once
  • tests/test_client.py::TestStream::test_chat_accumulates_streamed_deltaschat() rebuilds full text from deltas (now via "".join() — O(n))
  • tests/test_client.py::TestStream::test_messages_mode_tool_messageToolMessage via messages mode not duplicated by values-snapshot synthesis
  • Existing 129 tests/test_client.py tests still pass (backward compatible for non-streaming mock sources)
  • tests/test_harness_boundary.py passes (no new app.* imports from harness)
  • tests/test_client_live.py::TestLiveBasicChat passes (real chat() with HELLO and 56 responses)
  • tests/test_client_live.py::TestLiveStreaming passes (real streaming with usage dedup verified)
  • ruff check and ruff format --check pass on modified files after both commits

DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values",
"custom"] which only delivers full-state snapshots at graph-node
boundaries, so AI replies were dumped as a single messages-tuple event
per node instead of streaming token-by-token. `client.stream("hello")`
looked identical to `client.chat("hello")` — the bug reported in #1969.

Subscribe to "messages" mode as well, forward AIMessageChunk deltas as
messages-tuple events with delta semantics (consumers accumulate by id),
and dedup the values-snapshot path so it does not re-synthesize AI
text that was already streamed. Introduce a per-id usage_metadata
counter so the final AIMessage in the values snapshot and the final
"messages" chunk — which carry the same cumulative usage — are not
double-counted.

chat() now accumulates per-id deltas and returns the last message's
full accumulated text. Non-streaming mock sources (single event per id)
are a degenerate case of the same logic, keeping existing callers and
tests backward compatible.

Verified end-to-end against a real LLM: a 15-number count emits 35
messages-tuple events with BPE subword boundaries clearly visible
("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across
the window, end-event usage matches the values-snapshot usage exactly
(not doubled). tests/test_client_live.py::TestLiveStreaming passes.

New unit tests:
- test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3
  delta events with correct content/id/usage, values-snapshot does not
  duplicate, usage counted once.
- test_chat_accumulates_streamed_deltas: chat() rebuilds full text
  from deltas.
- test_messages_mode_tool_message: ToolMessage delivered via messages
  mode is not duplicated by the values-snapshot synthesis path.

The stream() docstring now documents why this client does not reuse
Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw
LangChain objects vs serialized dicts, single caller vs HTTP fan-out).

Fixes #1969

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the embedded DeerFlowClient conversation APIs to support true token-level streaming by subscribing to LangGraph "messages" mode and emitting per-chunk AI text deltas (while avoiding duplication from the "values" snapshot path).

Changes:

  • Subscribe to LangGraph stream_mode=["values", "messages", "custom"] and forward "messages" chunks as messages-tuple delta events.
  • Deduplicate values-snapshot synthesis for message IDs already streamed via "messages", and prevent double-counting usage_metadata across modes.
  • Update chat() to accumulate per-message-id deltas and return the final accumulated AI text; add/extend unit tests.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
backend/packages/harness/deerflow/client.py Adds "messages" stream subscription, delta forwarding, usage dedup, and chat() delta accumulation; expands stream() docstring.
backend/tests/test_client.py Adds regression tests validating token-delta emission, values-path dedup, usage dedup, and chat() accumulation.
backend/CLAUDE.md Updates embedded client documentation to reflect delta semantics and usage dedup behavior.

Comment thread backend/packages/harness/deerflow/client.py Outdated
Comment thread backend/packages/harness/deerflow/client.py
greatmengqi added 5 commits April 8, 2026 10:53
Post-review cleanup for the token-level streaming fix. No behavior
change for correct inputs; one efficiency regression fixed.

Fix: chat() O(n²) accumulator
-----------------------------
`chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`,
which is O(n) per concat → O(n²) total over a streamed response. At
~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks
it costs roughly 100-300 ms of pure copying. Switched to
`dict[str, list[str]]` + `"".join()` once at return.

Cleanup
-------
- Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`,
  and `_tool_message_event` static helpers. The messages-mode and
  values-mode branches previously repeated four inline dict literals each;
  they now call the same builders.
- `StreamEvent.type` is now typed as `Literal["values", "messages-tuple",
  "custom", "end"]` via a `StreamEventType` alias. Makes the closed set
  explicit and catches typos at type-check time.
- Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`,
  `.tool_calls`, `.id` all have default values on the base class, so the
  `getattr(..., None)` fallbacks were dead code. Removed from the hot
  path.
- `_account_usage` parameter type loosened to `Any` so that LangChain's
  `UsageMetadata` TypedDict is accepted under strict type checking.
- Trimmed narrating comments on `seen_ids` / `streamed_ids` / the
  values-synthesis skip block; kept the non-obvious ones that document
  the cross-mode dedup invariant.

Net diff: -15 lines. All 132 unit tests + harness boundary test still
pass; ruff check and ruff format pass.
Dedicated design document for the token-level streaming architecture,
prompted by the bug investigation in #1969.

Contents:
- Why two parallel streaming paths exist (Gateway HTTP/async vs
  DeerFlowClient sync/in-process) and why they cannot be merged.
- LangGraph's three-layer mode naming (Graph "messages" vs Platform
  SDK "messages-tuple" vs HTTP SSE) and why a shared string constant
  would be harmful.
- Gateway path: run_agent + StreamBridge + sse_consumer with a
  sequence diagram.
- DeerFlowClient path: sync generator + direct yield, delta semantics,
  chat() accumulator.
- Why the three id sets (seen_ids / streamed_ids / counted_usage_ids)
  each carry an independent invariant and cannot be collapsed.
- End-to-end sequence for a real conversation turn.
- Lessons from #1969: why mock-based tests missed the bug, why
  BPE subword boundaries in live output are the strongest
  correctness signal, and the regression test that locks it in.
- Source code location index.

Also:
- Link from backend/CLAUDE.md Embedded Client section.
- Link from backend/docs/README.md under Feature Documentation.
Three new tests in TestStream that lock the contract introduced by
PR #1974 so any future refactor (sync->async migration, sharing a
core with Gateway's run_agent, dedup strategy change) cannot
silently change behavior.

- test_dedup_requires_messages_before_values_invariant: canary that
  documents the order-dependence of cross-mode dedup. streamed_ids
  is populated only by the messages branch, so values-before-messages
  for the same id produces duplicate AI text events. Real LangGraph
  never inverts this order, but a refactor that does (or that makes
  dedup idempotent) must update this test deliberately.

- test_messages_mode_golden_event_sequence: locks the *exact* event
  sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1
  end) for a canonical streaming turn. List equality gives a clear
  diff on any drift in order, type, or payload shape.

- test_chat_accumulates_in_linear_time: perf canary for the O(n^2)
  fix in commit 1f11ba1. 10,000 single-char chunks must accumulate
  in under 1s; the threshold is wide enough to pass on slow CI but
  tight enough to fail if buffer = buffer + delta is restored.

All three tests pass alongside the existing 12 TestStream tests
(15/15). ruff check + ruff format clean.
Replace the misleading "raw LangChain objects (AIMessage,
usage_metadata as dataclasses), not dicts" claim in the
"Why not reuse Gateway's run_agent?" section. The implementation
already yields plain Python dicts (StreamEvent.data is dict, and
usage_metadata is a TypedDict), so the original wording suggested
a richer return type than the API actually delivers.

The corrected wording focuses on what is actually true and
relevant: this client skips the JSON/SSE serialization layer that
Gateway adds for HTTP wire transmission, and yields stream event
payloads directly as Python data structures.

Addresses Copilot review feedback on PR #1974.
Add test_none_id_chunks_produce_duplicates_known_limitation to
TestStream that explicitly documents and asserts the current
behavior when an LLM provider emits AIMessageChunk with id=None
(vLLM, certain custom backends).

The cross-mode dedup machinery cannot record a None id in
streamed_ids (guarded by ``if msg_id:``), so the values snapshot's
reassembled AIMessage with a real id falls through and synthesizes
a duplicate AI text event. The test asserts len == 2 and locks
this as a known limitation rather than silently letting future
contributors hit it without context.

Why this is documented rather than fixed:
* Falling back to ``metadata.get("id")`` does not help — LangGraph's
  messages-mode metadata never carries the message id.
* Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the
  values snapshot uses the same fallback, which it does not.
* A real fix requires provider cooperation (always emit chunk ids)
  or content-based dedup (false-positive risk), neither of which
  belongs in this PR.

If a real fix lands, replace this test with a positive assertion
that dedup works for None-id chunks.

Addresses Copilot review feedback on PR #1974 (client.py:515).
@WillemJiang

Copy link
Copy Markdown
Collaborator

@greatmengqi

_tool_message_event still uses getattr while the commit message says they were removed

def _tool_message_event(msg) -> "StreamEvent":
    return StreamEvent(
        type="messages-tuple",
        data={
            "type": "tool",
            "content": DeerFlowClient._extract_text(msg.content),
            "name": getattr(msg, "name", None),        # still uses getattr
            "tool_call_id": getattr(msg, "tool_call_id", None),  # still uses getattr
            "id": getattr(msg, "id", None),              # still uses getattr
        },
    )

The commit message says "Replaced dead getattr fallbacks with direct attribute access on the hot path." But ToolMessage fields like name, tool_call_id, and id are indeed always present on ToolMessage (they're
base class fields with defaults). The getattr is technically dead fallback here too — but it's also harmless and arguably defensive for the _tool_message_event builder since it could receive a duck-typed
object.

@WillemJiang WillemJiang added the question Further information is requested label Apr 8, 2026
@greatmengqi

Copy link
Copy Markdown
Collaborator Author

Good catch — you're right that the same rationale applies here. ToolMessage.name, .tool_call_id, and .id are base-class fields with defaults, so the getattr fallbacks are dead code. Fixed in the latest commit: added msg: ToolMessage type annotation and switched to direct attribute access.

fancyboi999 and others added 7 commits April 10, 2026 12:00
…ded colors (#1942)

- Fix `font-norma` typo to `font-normal` in message-list subtask count
- Fix dark mode `--border` using reddish hue (22.216) instead of neutral
- Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground`
- Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground`
- Add missing `font-sans` to welcome description `<pre>` for consistency
- Make case-study-section padding responsive (`px-4 md:px-20`)

Closes #1940
…ory requests (#1960)

After history.replaceState updates the URL from /chats/new to
/chats/{UUID}, Next.js useParams does not update because replaceState
bypasses the router. The useEffect in useThreadChat would then set
threadIdFromPath ('new') as the threadId, causing the LangGraph SDK
to call POST /threads/new/history which returns HTTP 422 (Invalid
thread ID: must be a UUID).

This fix adds a guard to skip the threadId update when
threadIdFromPath is the literal string 'new', preserving the
already-correct UUID that was set when the thread was created.
Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>
* Fix event loop conflict in SubagentExecutor.execute()

When SubagentExecutor.execute() is called from within an already-running
event loop (e.g., when the parent agent uses async/await), calling
asyncio.run() creates a new event loop that conflicts with asyncio
primitives (like httpx.AsyncClient) that were created in and bound to
the parent loop.

This fix detects if we're already in a running event loop, and if so,
runs the subagent in a separate thread with its own isolated event loop
to avoid conflicts.

Fixes: sub-task cards not appearing in Ultra mode when using async parent agents

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(subagent): harden isolated event loop execution

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
@WillemJiang WillemJiang merged commit b1aabe8 into main Apr 10, 2026
10 checks passed
@WillemJiang WillemJiang deleted the worktree-fix_issue branch April 10, 2026 10:16
GentleOrca pushed a commit to HuaguoshanLab/deer-flow that referenced this pull request Apr 13, 2026
…e#1969) (bytedance#1974)

* fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969)

DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values",
"custom"] which only delivers full-state snapshots at graph-node
boundaries, so AI replies were dumped as a single messages-tuple event
per node instead of streaming token-by-token. `client.stream("hello")`
looked identical to `client.chat("hello")` — the bug reported in bytedance#1969.

Subscribe to "messages" mode as well, forward AIMessageChunk deltas as
messages-tuple events with delta semantics (consumers accumulate by id),
and dedup the values-snapshot path so it does not re-synthesize AI
text that was already streamed. Introduce a per-id usage_metadata
counter so the final AIMessage in the values snapshot and the final
"messages" chunk — which carry the same cumulative usage — are not
double-counted.

chat() now accumulates per-id deltas and returns the last message's
full accumulated text. Non-streaming mock sources (single event per id)
are a degenerate case of the same logic, keeping existing callers and
tests backward compatible.

Verified end-to-end against a real LLM: a 15-number count emits 35
messages-tuple events with BPE subword boundaries clearly visible
("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across
the window, end-event usage matches the values-snapshot usage exactly
(not doubled). tests/test_client_live.py::TestLiveStreaming passes.

New unit tests:
- test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3
  delta events with correct content/id/usage, values-snapshot does not
  duplicate, usage counted once.
- test_chat_accumulates_streamed_deltas: chat() rebuilds full text
  from deltas.
- test_messages_mode_tool_message: ToolMessage delivered via messages
  mode is not duplicated by the values-snapshot synthesis path.

The stream() docstring now documents why this client does not reuse
Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw
LangChain objects vs serialized dicts, single caller vs HTTP fan-out).

Fixes bytedance#1969

* refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969)

Post-review cleanup for the token-level streaming fix. No behavior
change for correct inputs; one efficiency regression fixed.

Fix: chat() O(n²) accumulator
-----------------------------
`chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`,
which is O(n) per concat → O(n²) total over a streamed response. At
~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks
it costs roughly 100-300 ms of pure copying. Switched to
`dict[str, list[str]]` + `"".join()` once at return.

Cleanup
-------
- Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`,
  and `_tool_message_event` static helpers. The messages-mode and
  values-mode branches previously repeated four inline dict literals each;
  they now call the same builders.
- `StreamEvent.type` is now typed as `Literal["values", "messages-tuple",
  "custom", "end"]` via a `StreamEventType` alias. Makes the closed set
  explicit and catches typos at type-check time.
- Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`,
  `.tool_calls`, `.id` all have default values on the base class, so the
  `getattr(..., None)` fallbacks were dead code. Removed from the hot
  path.
- `_account_usage` parameter type loosened to `Any` so that LangChain's
  `UsageMetadata` TypedDict is accepted under strict type checking.
- Trimmed narrating comments on `seen_ids` / `streamed_ids` / the
  values-synthesis skip block; kept the non-obvious ones that document
  the cross-mode dedup invariant.

Net diff: -15 lines. All 132 unit tests + harness boundary test still
pass; ruff check and ruff format pass.

* docs(backend): add STREAMING.md design note (bytedance#1969)

Dedicated design document for the token-level streaming architecture,
prompted by the bug investigation in bytedance#1969.

Contents:
- Why two parallel streaming paths exist (Gateway HTTP/async vs
  DeerFlowClient sync/in-process) and why they cannot be merged.
- LangGraph's three-layer mode naming (Graph "messages" vs Platform
  SDK "messages-tuple" vs HTTP SSE) and why a shared string constant
  would be harmful.
- Gateway path: run_agent + StreamBridge + sse_consumer with a
  sequence diagram.
- DeerFlowClient path: sync generator + direct yield, delta semantics,
  chat() accumulator.
- Why the three id sets (seen_ids / streamed_ids / counted_usage_ids)
  each carry an independent invariant and cannot be collapsed.
- End-to-end sequence for a real conversation turn.
- Lessons from bytedance#1969: why mock-based tests missed the bug, why
  BPE subword boundaries in live output are the strongest
  correctness signal, and the regression test that locks it in.
- Source code location index.

Also:
- Link from backend/CLAUDE.md Embedded Client section.
- Link from backend/docs/README.md under Feature Documentation.

* test(backend): add refactor regression guards for stream() (bytedance#1969)

Three new tests in TestStream that lock the contract introduced by
PR bytedance#1974 so any future refactor (sync->async migration, sharing a
core with Gateway's run_agent, dedup strategy change) cannot
silently change behavior.

- test_dedup_requires_messages_before_values_invariant: canary that
  documents the order-dependence of cross-mode dedup. streamed_ids
  is populated only by the messages branch, so values-before-messages
  for the same id produces duplicate AI text events. Real LangGraph
  never inverts this order, but a refactor that does (or that makes
  dedup idempotent) must update this test deliberately.

- test_messages_mode_golden_event_sequence: locks the *exact* event
  sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1
  end) for a canonical streaming turn. List equality gives a clear
  diff on any drift in order, type, or payload shape.

- test_chat_accumulates_in_linear_time: perf canary for the O(n^2)
  fix in commit 1f11ba1. 10,000 single-char chunks must accumulate
  in under 1s; the threshold is wide enough to pass on slow CI but
  tight enough to fail if buffer = buffer + delta is restored.

All three tests pass alongside the existing 12 TestStream tests
(15/15). ruff check + ruff format clean.

* docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969)

Replace the misleading "raw LangChain objects (AIMessage,
usage_metadata as dataclasses), not dicts" claim in the
"Why not reuse Gateway's run_agent?" section. The implementation
already yields plain Python dicts (StreamEvent.data is dict, and
usage_metadata is a TypedDict), so the original wording suggested
a richer return type than the API actually delivers.

The corrected wording focuses on what is actually true and
relevant: this client skips the JSON/SSE serialization layer that
Gateway adds for HTTP wire transmission, and yields stream event
payloads directly as Python data structures.

Addresses Copilot review feedback on PR bytedance#1974.

* test(backend): document none-id messages dedup limitation (bytedance#1969)

Add test_none_id_chunks_produce_duplicates_known_limitation to
TestStream that explicitly documents and asserts the current
behavior when an LLM provider emits AIMessageChunk with id=None
(vLLM, certain custom backends).

The cross-mode dedup machinery cannot record a None id in
streamed_ids (guarded by ``if msg_id:``), so the values snapshot's
reassembled AIMessage with a real id falls through and synthesizes
a duplicate AI text event. The test asserts len == 2 and locks
this as a known limitation rather than silently letting future
contributors hit it without context.

Why this is documented rather than fixed:
* Falling back to ``metadata.get("id")`` does not help — LangGraph's
  messages-mode metadata never carries the message id.
* Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the
  values snapshot uses the same fallback, which it does not.
* A real fix requires provider cooperation (always emit chunk ids)
  or content-based dedup (false-positive risk), neither of which
  belongs in this PR.

If a real fix lands, replace this test with a positive assertion
that dedup works for None-id chunks.

Addresses Copilot review feedback on PR bytedance#1974 (client.py:515).

* fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942)

- Fix `font-norma` typo to `font-normal` in message-list subtask count
- Fix dark mode `--border` using reddish hue (22.216) instead of neutral
- Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground`
- Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground`
- Add missing `font-sans` to welcome description `<pre>` for consistency
- Make case-study-section padding responsive (`px-4 md:px-20`)

Closes bytedance#1940

* docs: clarify deployment sizing guidance (bytedance#1963)

* fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960)

After history.replaceState updates the URL from /chats/new to
/chats/{UUID}, Next.js useParams does not update because replaceState
bypasses the router. The useEffect in useThreadChat would then set
threadIdFromPath ('new') as the threadId, causing the LangGraph SDK
to call POST /threads/new/history which returns HTTP 422 (Invalid
thread ID: must be a UUID).

This fix adds a guard to skip the threadId update when
threadIdFromPath is the literal string 'new', preserving the
already-correct UUID that was set when the thread was created.

* fix(frontend): avoid using route new as thread id (bytedance#1967)

Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>

* Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965)

* Fix event loop conflict in SubagentExecutor.execute()

When SubagentExecutor.execute() is called from within an already-running
event loop (e.g., when the parent agent uses async/await), calling
asyncio.run() creates a new event loop that conflicts with asyncio
primitives (like httpx.AsyncClient) that were created in and bound to
the parent loop.

This fix detects if we're already in a running event loop, and if so,
runs the subagent in a separate thread with its own isolated event loop
to avoid conflicts.

Fixes: sub-task cards not appearing in Ultra mode when using async parent agents

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(subagent): harden isolated event loop execution

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(backend): remove dead getattr in _tool_message_event

---------

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com>
Co-authored-by: 13ernkastel <LennonCMJ@live.com>
Co-authored-by: siwuai <458372151@qq.com>
Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com>
Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>
Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
MarkHoch pushed a commit to MarkHoch/deer-flow that referenced this pull request Apr 16, 2026
…e#1969) (bytedance#1974)

* fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969)

DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values",
"custom"] which only delivers full-state snapshots at graph-node
boundaries, so AI replies were dumped as a single messages-tuple event
per node instead of streaming token-by-token. `client.stream("hello")`
looked identical to `client.chat("hello")` — the bug reported in bytedance#1969.

Subscribe to "messages" mode as well, forward AIMessageChunk deltas as
messages-tuple events with delta semantics (consumers accumulate by id),
and dedup the values-snapshot path so it does not re-synthesize AI
text that was already streamed. Introduce a per-id usage_metadata
counter so the final AIMessage in the values snapshot and the final
"messages" chunk — which carry the same cumulative usage — are not
double-counted.

chat() now accumulates per-id deltas and returns the last message's
full accumulated text. Non-streaming mock sources (single event per id)
are a degenerate case of the same logic, keeping existing callers and
tests backward compatible.

Verified end-to-end against a real LLM: a 15-number count emits 35
messages-tuple events with BPE subword boundaries clearly visible
("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across
the window, end-event usage matches the values-snapshot usage exactly
(not doubled). tests/test_client_live.py::TestLiveStreaming passes.

New unit tests:
- test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3
  delta events with correct content/id/usage, values-snapshot does not
  duplicate, usage counted once.
- test_chat_accumulates_streamed_deltas: chat() rebuilds full text
  from deltas.
- test_messages_mode_tool_message: ToolMessage delivered via messages
  mode is not duplicated by the values-snapshot synthesis path.

The stream() docstring now documents why this client does not reuse
Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw
LangChain objects vs serialized dicts, single caller vs HTTP fan-out).

Fixes bytedance#1969

* refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969)

Post-review cleanup for the token-level streaming fix. No behavior
change for correct inputs; one efficiency regression fixed.

Fix: chat() O(n²) accumulator
-----------------------------
`chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`,
which is O(n) per concat → O(n²) total over a streamed response. At
~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks
it costs roughly 100-300 ms of pure copying. Switched to
`dict[str, list[str]]` + `"".join()` once at return.

Cleanup
-------
- Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`,
  and `_tool_message_event` static helpers. The messages-mode and
  values-mode branches previously repeated four inline dict literals each;
  they now call the same builders.
- `StreamEvent.type` is now typed as `Literal["values", "messages-tuple",
  "custom", "end"]` via a `StreamEventType` alias. Makes the closed set
  explicit and catches typos at type-check time.
- Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`,
  `.tool_calls`, `.id` all have default values on the base class, so the
  `getattr(..., None)` fallbacks were dead code. Removed from the hot
  path.
- `_account_usage` parameter type loosened to `Any` so that LangChain's
  `UsageMetadata` TypedDict is accepted under strict type checking.
- Trimmed narrating comments on `seen_ids` / `streamed_ids` / the
  values-synthesis skip block; kept the non-obvious ones that document
  the cross-mode dedup invariant.

Net diff: -15 lines. All 132 unit tests + harness boundary test still
pass; ruff check and ruff format pass.

* docs(backend): add STREAMING.md design note (bytedance#1969)

Dedicated design document for the token-level streaming architecture,
prompted by the bug investigation in bytedance#1969.

Contents:
- Why two parallel streaming paths exist (Gateway HTTP/async vs
  DeerFlowClient sync/in-process) and why they cannot be merged.
- LangGraph's three-layer mode naming (Graph "messages" vs Platform
  SDK "messages-tuple" vs HTTP SSE) and why a shared string constant
  would be harmful.
- Gateway path: run_agent + StreamBridge + sse_consumer with a
  sequence diagram.
- DeerFlowClient path: sync generator + direct yield, delta semantics,
  chat() accumulator.
- Why the three id sets (seen_ids / streamed_ids / counted_usage_ids)
  each carry an independent invariant and cannot be collapsed.
- End-to-end sequence for a real conversation turn.
- Lessons from bytedance#1969: why mock-based tests missed the bug, why
  BPE subword boundaries in live output are the strongest
  correctness signal, and the regression test that locks it in.
- Source code location index.

Also:
- Link from backend/CLAUDE.md Embedded Client section.
- Link from backend/docs/README.md under Feature Documentation.

* test(backend): add refactor regression guards for stream() (bytedance#1969)

Three new tests in TestStream that lock the contract introduced by
PR bytedance#1974 so any future refactor (sync->async migration, sharing a
core with Gateway's run_agent, dedup strategy change) cannot
silently change behavior.

- test_dedup_requires_messages_before_values_invariant: canary that
  documents the order-dependence of cross-mode dedup. streamed_ids
  is populated only by the messages branch, so values-before-messages
  for the same id produces duplicate AI text events. Real LangGraph
  never inverts this order, but a refactor that does (or that makes
  dedup idempotent) must update this test deliberately.

- test_messages_mode_golden_event_sequence: locks the *exact* event
  sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1
  end) for a canonical streaming turn. List equality gives a clear
  diff on any drift in order, type, or payload shape.

- test_chat_accumulates_in_linear_time: perf canary for the O(n^2)
  fix in commit 1f11ba1. 10,000 single-char chunks must accumulate
  in under 1s; the threshold is wide enough to pass on slow CI but
  tight enough to fail if buffer = buffer + delta is restored.

All three tests pass alongside the existing 12 TestStream tests
(15/15). ruff check + ruff format clean.

* docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969)

Replace the misleading "raw LangChain objects (AIMessage,
usage_metadata as dataclasses), not dicts" claim in the
"Why not reuse Gateway's run_agent?" section. The implementation
already yields plain Python dicts (StreamEvent.data is dict, and
usage_metadata is a TypedDict), so the original wording suggested
a richer return type than the API actually delivers.

The corrected wording focuses on what is actually true and
relevant: this client skips the JSON/SSE serialization layer that
Gateway adds for HTTP wire transmission, and yields stream event
payloads directly as Python data structures.

Addresses Copilot review feedback on PR bytedance#1974.

* test(backend): document none-id messages dedup limitation (bytedance#1969)

Add test_none_id_chunks_produce_duplicates_known_limitation to
TestStream that explicitly documents and asserts the current
behavior when an LLM provider emits AIMessageChunk with id=None
(vLLM, certain custom backends).

The cross-mode dedup machinery cannot record a None id in
streamed_ids (guarded by ``if msg_id:``), so the values snapshot's
reassembled AIMessage with a real id falls through and synthesizes
a duplicate AI text event. The test asserts len == 2 and locks
this as a known limitation rather than silently letting future
contributors hit it without context.

Why this is documented rather than fixed:
* Falling back to ``metadata.get("id")`` does not help — LangGraph's
  messages-mode metadata never carries the message id.
* Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the
  values snapshot uses the same fallback, which it does not.
* A real fix requires provider cooperation (always emit chunk ids)
  or content-based dedup (false-positive risk), neither of which
  belongs in this PR.

If a real fix lands, replace this test with a positive assertion
that dedup works for None-id chunks.

Addresses Copilot review feedback on PR bytedance#1974 (client.py:515).

* fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942)

- Fix `font-norma` typo to `font-normal` in message-list subtask count
- Fix dark mode `--border` using reddish hue (22.216) instead of neutral
- Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground`
- Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground`
- Add missing `font-sans` to welcome description `<pre>` for consistency
- Make case-study-section padding responsive (`px-4 md:px-20`)

Closes bytedance#1940

* docs: clarify deployment sizing guidance (bytedance#1963)

* fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960)

After history.replaceState updates the URL from /chats/new to
/chats/{UUID}, Next.js useParams does not update because replaceState
bypasses the router. The useEffect in useThreadChat would then set
threadIdFromPath ('new') as the threadId, causing the LangGraph SDK
to call POST /threads/new/history which returns HTTP 422 (Invalid
thread ID: must be a UUID).

This fix adds a guard to skip the threadId update when
threadIdFromPath is the literal string 'new', preserving the
already-correct UUID that was set when the thread was created.

* fix(frontend): avoid using route new as thread id (bytedance#1967)

Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>

* Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965)

* Fix event loop conflict in SubagentExecutor.execute()

When SubagentExecutor.execute() is called from within an already-running
event loop (e.g., when the parent agent uses async/await), calling
asyncio.run() creates a new event loop that conflicts with asyncio
primitives (like httpx.AsyncClient) that were created in and bound to
the parent loop.

This fix detects if we're already in a running event loop, and if so,
runs the subagent in a separate thread with its own isolated event loop
to avoid conflicts.

Fixes: sub-task cards not appearing in Ultra mode when using async parent agents

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(subagent): harden isolated event loop execution

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(backend): remove dead getattr in _tool_message_event

---------

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com>
Co-authored-by: 13ernkastel <LennonCMJ@live.com>
Co-authored-by: siwuai <458372151@qq.com>
Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com>
Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>
Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
orbisai0security pushed a commit to orbisai0security/deer-flow that referenced this pull request Apr 27, 2026
…e#1969) (bytedance#1974)

* fix(backend): stream DeerFlowClient AI text as token deltas (bytedance#1969)

DeerFlowClient.stream() subscribed to LangGraph stream_mode=["values",
"custom"] which only delivers full-state snapshots at graph-node
boundaries, so AI replies were dumped as a single messages-tuple event
per node instead of streaming token-by-token. `client.stream("hello")`
looked identical to `client.chat("hello")` — the bug reported in bytedance#1969.

Subscribe to "messages" mode as well, forward AIMessageChunk deltas as
messages-tuple events with delta semantics (consumers accumulate by id),
and dedup the values-snapshot path so it does not re-synthesize AI
text that was already streamed. Introduce a per-id usage_metadata
counter so the final AIMessage in the values snapshot and the final
"messages" chunk — which carry the same cumulative usage — are not
double-counted.

chat() now accumulates per-id deltas and returns the last message's
full accumulated text. Non-streaming mock sources (single event per id)
are a degenerate case of the same logic, keeping existing callers and
tests backward compatible.

Verified end-to-end against a real LLM: a 15-number count emits 35
messages-tuple events with BPE subword boundaries clearly visible
("eleven" -> "ele" / "ven", "twelve" -> "tw" / "elve"), 476ms across
the window, end-event usage matches the values-snapshot usage exactly
(not doubled). tests/test_client_live.py::TestLiveStreaming passes.

New unit tests:
- test_messages_mode_emits_token_deltas: 3 AIMessageChunks produce 3
  delta events with correct content/id/usage, values-snapshot does not
  duplicate, usage counted once.
- test_chat_accumulates_streamed_deltas: chat() rebuilds full text
  from deltas.
- test_messages_mode_tool_message: ToolMessage delivered via messages
  mode is not duplicated by the values-snapshot synthesis path.

The stream() docstring now documents why this client does not reuse
Gateway's run_agent() / StreamBridge pipeline (sync vs async, raw
LangChain objects vs serialized dicts, single caller vs HTTP fan-out).

Fixes bytedance#1969

* refactor(backend): simplify DeerFlowClient streaming helpers (bytedance#1969)

Post-review cleanup for the token-level streaming fix. No behavior
change for correct inputs; one efficiency regression fixed.

Fix: chat() O(n²) accumulator
-----------------------------
`chat()` accumulated per-id text via `buffers[id] = buffers.get(id,"") + delta`,
which is O(n) per concat → O(n²) total over a streamed response. At
~2 KB cumulative text this becomes user-visible; at 50 KB / 5000 chunks
it costs roughly 100-300 ms of pure copying. Switched to
`dict[str, list[str]]` + `"".join()` once at return.

Cleanup
-------
- Extract `_serialize_tool_calls`, `_ai_text_event`, `_ai_tool_calls_event`,
  and `_tool_message_event` static helpers. The messages-mode and
  values-mode branches previously repeated four inline dict literals each;
  they now call the same builders.
- `StreamEvent.type` is now typed as `Literal["values", "messages-tuple",
  "custom", "end"]` via a `StreamEventType` alias. Makes the closed set
  explicit and catches typos at type-check time.
- Direct attribute access on `AIMessage`/`AIMessageChunk`: `.usage_metadata`,
  `.tool_calls`, `.id` all have default values on the base class, so the
  `getattr(..., None)` fallbacks were dead code. Removed from the hot
  path.
- `_account_usage` parameter type loosened to `Any` so that LangChain's
  `UsageMetadata` TypedDict is accepted under strict type checking.
- Trimmed narrating comments on `seen_ids` / `streamed_ids` / the
  values-synthesis skip block; kept the non-obvious ones that document
  the cross-mode dedup invariant.

Net diff: -15 lines. All 132 unit tests + harness boundary test still
pass; ruff check and ruff format pass.

* docs(backend): add STREAMING.md design note (bytedance#1969)

Dedicated design document for the token-level streaming architecture,
prompted by the bug investigation in bytedance#1969.

Contents:
- Why two parallel streaming paths exist (Gateway HTTP/async vs
  DeerFlowClient sync/in-process) and why they cannot be merged.
- LangGraph's three-layer mode naming (Graph "messages" vs Platform
  SDK "messages-tuple" vs HTTP SSE) and why a shared string constant
  would be harmful.
- Gateway path: run_agent + StreamBridge + sse_consumer with a
  sequence diagram.
- DeerFlowClient path: sync generator + direct yield, delta semantics,
  chat() accumulator.
- Why the three id sets (seen_ids / streamed_ids / counted_usage_ids)
  each carry an independent invariant and cannot be collapsed.
- End-to-end sequence for a real conversation turn.
- Lessons from bytedance#1969: why mock-based tests missed the bug, why
  BPE subword boundaries in live output are the strongest
  correctness signal, and the regression test that locks it in.
- Source code location index.

Also:
- Link from backend/CLAUDE.md Embedded Client section.
- Link from backend/docs/README.md under Feature Documentation.

* test(backend): add refactor regression guards for stream() (bytedance#1969)

Three new tests in TestStream that lock the contract introduced by
PR bytedance#1974 so any future refactor (sync->async migration, sharing a
core with Gateway's run_agent, dedup strategy change) cannot
silently change behavior.

- test_dedup_requires_messages_before_values_invariant: canary that
  documents the order-dependence of cross-mode dedup. streamed_ids
  is populated only by the messages branch, so values-before-messages
  for the same id produces duplicate AI text events. Real LangGraph
  never inverts this order, but a refactor that does (or that makes
  dedup idempotent) must update this test deliberately.

- test_messages_mode_golden_event_sequence: locks the *exact* event
  sequence (4 events: 2 messages-tuple deltas, 1 values snapshot, 1
  end) for a canonical streaming turn. List equality gives a clear
  diff on any drift in order, type, or payload shape.

- test_chat_accumulates_in_linear_time: perf canary for the O(n^2)
  fix in commit 1f11ba1. 10,000 single-char chunks must accumulate
  in under 1s; the threshold is wide enough to pass on slow CI but
  tight enough to fail if buffer = buffer + delta is restored.

All three tests pass alongside the existing 12 TestStream tests
(15/15). ruff check + ruff format clean.

* docs(backend): clarify stream() docstring on JSON serialization (bytedance#1969)

Replace the misleading "raw LangChain objects (AIMessage,
usage_metadata as dataclasses), not dicts" claim in the
"Why not reuse Gateway's run_agent?" section. The implementation
already yields plain Python dicts (StreamEvent.data is dict, and
usage_metadata is a TypedDict), so the original wording suggested
a richer return type than the API actually delivers.

The corrected wording focuses on what is actually true and
relevant: this client skips the JSON/SSE serialization layer that
Gateway adds for HTTP wire transmission, and yields stream event
payloads directly as Python data structures.

Addresses Copilot review feedback on PR bytedance#1974.

* test(backend): document none-id messages dedup limitation (bytedance#1969)

Add test_none_id_chunks_produce_duplicates_known_limitation to
TestStream that explicitly documents and asserts the current
behavior when an LLM provider emits AIMessageChunk with id=None
(vLLM, certain custom backends).

The cross-mode dedup machinery cannot record a None id in
streamed_ids (guarded by ``if msg_id:``), so the values snapshot's
reassembled AIMessage with a real id falls through and synthesizes
a duplicate AI text event. The test asserts len == 2 and locks
this as a known limitation rather than silently letting future
contributors hit it without context.

Why this is documented rather than fixed:
* Falling back to ``metadata.get("id")`` does not help — LangGraph's
  messages-mode metadata never carries the message id.
* Synthesizing ``f"_synth_{id(msg_chunk)}"`` only helps if the
  values snapshot uses the same fallback, which it does not.
* A real fix requires provider cooperation (always emit chunk ids)
  or content-based dedup (false-positive risk), neither of which
  belongs in this PR.

If a real fix lands, replace this test with a positive assertion
that dedup works for None-id chunks.

Addresses Copilot review feedback on PR bytedance#1974 (client.py:515).

* fix(frontend): UI polish - fix CSS typo, dark mode border, and hardcoded colors (bytedance#1942)

- Fix `font-norma` typo to `font-normal` in message-list subtask count
- Fix dark mode `--border` using reddish hue (22.216) instead of neutral
- Replace hardcoded `rgb(184,184,192)` in hero with `text-muted-foreground`
- Replace hardcoded `bg-[#a3a1a1]` in streaming indicator with `bg-muted-foreground`
- Add missing `font-sans` to welcome description `<pre>` for consistency
- Make case-study-section padding responsive (`px-4 md:px-20`)

Closes bytedance#1940

* docs: clarify deployment sizing guidance (bytedance#1963)

* fix(frontend): prevent stale 'new' thread ID from triggering 422 history requests (bytedance#1960)

After history.replaceState updates the URL from /chats/new to
/chats/{UUID}, Next.js useParams does not update because replaceState
bypasses the router. The useEffect in useThreadChat would then set
threadIdFromPath ('new') as the threadId, causing the LangGraph SDK
to call POST /threads/new/history which returns HTTP 422 (Invalid
thread ID: must be a UUID).

This fix adds a guard to skip the threadId update when
threadIdFromPath is the literal string 'new', preserving the
already-correct UUID that was set when the thread was created.

* fix(frontend): avoid using route new as thread id (bytedance#1967)

Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>

* Fix(subagent): Event loop conflict in SubagentExecutor.execute() (bytedance#1965)

* Fix event loop conflict in SubagentExecutor.execute()

When SubagentExecutor.execute() is called from within an already-running
event loop (e.g., when the parent agent uses async/await), calling
asyncio.run() creates a new event loop that conflicts with asyncio
primitives (like httpx.AsyncClient) that were created in and bound to
the parent loop.

This fix detects if we're already in a running event loop, and if so,
runs the subagent in a separate thread with its own isolated event loop
to avoid conflicts.

Fixes: sub-task cards not appearing in Ultra mode when using async parent agents

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(subagent): harden isolated event loop execution

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(backend): remove dead getattr in _tool_message_event

---------

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
Co-authored-by: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com>
Co-authored-by: 13ernkastel <LennonCMJ@live.com>
Co-authored-by: siwuai <458372151@qq.com>
Co-authored-by: 肖 <168966994+luoxiao6645@users.noreply.github.com>
Co-authored-by: luoxiao6645 <luoxiao6645@gmail.com>
Co-authored-by: Saber <11769524+hawkli-1994@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

question Further information is requested

Projects

None yet

Development

Successfully merging this pull request may close these issues.

stream测试

8 participants