Skip to content

feat(sdk-py): extract stream decoders and add interleave_projections#7935

Merged
Nick Hollon (nick-hollon-lc) merged 17 commits into
mainfrom
nh/sdk-py-interleave-projections
May 29, 2026
Merged

feat(sdk-py): extract stream decoders and add interleave_projections#7935
Nick Hollon (nick-hollon-lc) merged 17 commits into
mainfrom
nh/sdk-py-interleave-projections

Conversation

@nick-hollon-lc

@nick-hollon-lc Nick Hollon (nick-hollon-lc) commented May 28, 2026

Copy link
Copy Markdown
Contributor

Summary

Refactors the four (now five) sdk-py streaming projections into reusable, transport-agnostic Decoder classes and adds a new interleave_projections(channels) method to AsyncThreadStream and SyncThreadStream that drives multiple decoders from one shared subscription, yielding (channel_name, item) tuples in arrival order (the SDK analog of local GraphRunStream.interleave).

  • New langgraph_sdk/stream/decoders.py: pure feed(event) -> Iterable[item] state machines — ValuesDecoder, MessagesDecoder, ToolCallsDecoder, SubgraphsDecoder, ExtensionsDecoder — behind a Decoder Protocol. No subscription/queue/thread access.
  • Projection migration (async + sync): the five existing projections now delegate their per-event logic to the decoders. Behavior-preserving — the existing test suite is the regression net. Thread-coupled side effects (active-stream registration, terminal-error-on-close, root-inbox forwarding) stay in the projection wrappers; sync messages/tool_calls keep their pre-dispatch contract (handle/stream resolved on yield) via a FIFO-head buffer.
  • interleave_projections: flat-namespace channel list (built-ins + extension names), tool_callstools wire mapping, subgraphs fed every event, extensions keyed by bare name.

Notable

  • Fixes a latent sync bug surfaced by the refactor: two tool calls / messages whose events interleave previously dropped the second; both now surface. Locked in with a regression test.
  • Decoder.feed takes Mapping[str, Any] (read-only), so the Protocol is load-bearing in both stream files.

Deferred (not in this PR)

  • Wiring RemoteGraph._RemoteGraphRunStream.interleave to interleave_projections (gated on feat(langgraph): add v3 streaming support to RemoteGraph #7927).
  • Migrating the handle-scoped projections (_Handle*Projection) to the decoders — hence the small, verified-identical helper duplication between decoders.py and _async/stream.py.
  • interleave_projections handles aren't registered for thread-close cleanup (additive follow-up).

Test Plan

  • make test in libs/sdk-py — 464 passed, 0 failures
  • make format / make lint (ruff + ty) clean
  • Per-decoder unit tests in tests/streaming/test_decoders.py
  • interleave_projections tests (single-channel, multi-channel arrival order, builtin+extension mix, tool_calls public-name, subgraphs discovery) async + sync
  • Existing projection suites pass unchanged (regression net for the migration)

Tool calls and subgraph children created by interleave_projections were
never finalized when the iterator tore down (early break, run end), so an
awaiting handle.output / handle.messages could hang forever. Register the
decoder-created tool-call handles and message streams into the thread's
active sets and add a teardown finally that fails in-flight tool calls and
force-completes discovered subgraph children with the run's terminal
status, mirroring the dedicated tool_calls / subgraphs projections.

Also add the eager-yield Note to the async interleave_projections docstring
to match sync, and add the missing sync subgraphs interleave test.
Nick Hollon (nick-hollon-lc) added a commit that referenced this pull request May 28, 2026
Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub with
delegation to SyncThreadStream.interleave_projections (added in #7935). The async
adapter intentionally keeps no interleave (mirrors local AsyncGraphRunStream).

Sequenced after #7935: interleave_projections resolves from the editable ../sdk-py
workspace dependency once #7935 is in the tree, so no dependency pin is used (a git
pin is incompatible with the monorepo's circular editable path sources). The unit
test mocks the SDK thread, so CI is green now; the feature activates at runtime once
#7935 merges.
`interleave_projections` routed any non-builtin channel name to the
extension/`custom:` fallback. But `infer_channel` recognizes `checkpoints`,
`updates`, `tasks`, and `lifecycle` as first-class protocol methods, so
`interleave_projections(["checkpoints"])` subscribed to `custom:checkpoints`
and used `ExtensionsDecoder` (which only matches `method == "custom"`) — the
subscription never matched and the stream silently yielded nothing.

Add a shared `validate_interleave_channels` guard (one source of truth for
async + sync) that rejects reserved protocol channel names up front with a
clear `ValueError`. Genuine extension names still fall through to the custom
path; `tools` is rejected with a hint to use the public `tool_calls` name.

Tests: parametrized async + sync rejection tests over all reserved names.
Add SDK `interleave_projections` support for the `updates`, `checkpoints`,
and `tasks` protocol channels, matching local `GraphRunStream`'s native
transformer set. These were previously rejected as reserved names.

Generalize `ValuesDecoder` into `DataDecoder(method, namespace=None)` — the
SDK analog of local's Values/Updates/Checkpoints/TasksTransformer, which all
push `params["data"]` unchanged. `DataDecoder("values")` (namespace=None)
reproduces the old behavior; the three new channels use `namespace=[]`.

The root-namespace filter is load-bearing: a co-requested unscoped `values`
widens the merged subscription (via `compute_union_filter`) to all
namespaces, so the decoder itself must drop subgraph payloads to stay
root-scoped — mirroring local's `namespace != scope` check. Locked in with
a namespace-scoping regression test.

Move updates/checkpoints/tasks from RESERVED_INTERLEAVE_CHANNELS to
SUPPORTED; lifecycle/tools/input stay reserved (control-plane / wire alias).
@nick-hollon-lc Nick Hollon (nick-hollon-lc) merged commit 1fcb768 into main May 29, 2026
68 checks passed
@nick-hollon-lc Nick Hollon (nick-hollon-lc) deleted the nh/sdk-py-interleave-projections branch May 29, 2026 21:14
Nick Hollon (nick-hollon-lc) added a commit that referenced this pull request Jun 1, 2026
…jections

Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub
with delegation to SyncThreadStream.interleave_projections (added in #7935,
merged to main). Yields (channel, item) tuples in arrival order, matching the
local GraphRunStream.interleave surface.

The async adapter intentionally keeps no interleave (mirrors local
AsyncGraphRunStream, where callers compose with asyncio.gather).

interleave_projections resolves from the editable ../sdk-py workspace dependency
(now 0.4.0), so no dependency pin change is needed -- main already carries
langgraph-sdk>=0.4.0,<0.5.0 via #7927.
Nick Hollon (nick-hollon-lc) added a commit that referenced this pull request Jun 1, 2026
…jections

Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub
with delegation to SyncThreadStream.interleave_projections (added in #7935,
merged to main). Yields (channel, item) tuples in arrival order, matching the
local GraphRunStream.interleave surface.

The async adapter intentionally keeps no interleave (mirrors local
AsyncGraphRunStream, where callers compose with asyncio.gather).

interleave_projections resolves from the editable ../sdk-py workspace dependency
(now 0.4.0), so no dependency pin change is needed -- main already carries
langgraph-sdk>=0.4.0,<0.5.0 via #7927.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants