feat(sdk-py): extract stream decoders and add interleave_projections#7935
Merged
Nick Hollon (nick-hollon-lc) merged 17 commits intoMay 29, 2026
Conversation
Tool calls and subgraph children created by interleave_projections were never finalized when the iterator tore down (early break, run end), so an awaiting handle.output / handle.messages could hang forever. Register the decoder-created tool-call handles and message streams into the thread's active sets and add a teardown finally that fails in-flight tool calls and force-completes discovered subgraph children with the run's terminal status, mirroring the dedicated tool_calls / subgraphs projections. Also add the eager-yield Note to the async interleave_projections docstring to match sync, and add the missing sync subgraphs interleave test.
4 tasks
Nick Hollon (nick-hollon-lc)
added a commit
that referenced
this pull request
May 28, 2026
Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub with delegation to SyncThreadStream.interleave_projections (added in #7935). The async adapter intentionally keeps no interleave (mirrors local AsyncGraphRunStream). Sequenced after #7935: interleave_projections resolves from the editable ../sdk-py workspace dependency once #7935 is in the tree, so no dependency pin is used (a git pin is incompatible with the monorepo's circular editable path sources). The unit test mocks the SDK thread, so CI is green now; the feature activates at runtime once #7935 merges.
Josh Rogers (jdrogers940)
approved these changes
May 29, 2026
`interleave_projections` routed any non-builtin channel name to the extension/`custom:` fallback. But `infer_channel` recognizes `checkpoints`, `updates`, `tasks`, and `lifecycle` as first-class protocol methods, so `interleave_projections(["checkpoints"])` subscribed to `custom:checkpoints` and used `ExtensionsDecoder` (which only matches `method == "custom"`) — the subscription never matched and the stream silently yielded nothing. Add a shared `validate_interleave_channels` guard (one source of truth for async + sync) that rejects reserved protocol channel names up front with a clear `ValueError`. Genuine extension names still fall through to the custom path; `tools` is rejected with a hint to use the public `tool_calls` name. Tests: parametrized async + sync rejection tests over all reserved names.
Add SDK `interleave_projections` support for the `updates`, `checkpoints`,
and `tasks` protocol channels, matching local `GraphRunStream`'s native
transformer set. These were previously rejected as reserved names.
Generalize `ValuesDecoder` into `DataDecoder(method, namespace=None)` — the
SDK analog of local's Values/Updates/Checkpoints/TasksTransformer, which all
push `params["data"]` unchanged. `DataDecoder("values")` (namespace=None)
reproduces the old behavior; the three new channels use `namespace=[]`.
The root-namespace filter is load-bearing: a co-requested unscoped `values`
widens the merged subscription (via `compute_union_filter`) to all
namespaces, so the decoder itself must drop subgraph payloads to stay
root-scoped — mirroring local's `namespace != scope` check. Locked in with
a namespace-scoping regression test.
Move updates/checkpoints/tasks from RESERVED_INTERLEAVE_CHANNELS to
SUPPORTED; lifecycle/tools/input stay reserved (control-plane / wire alias).
Nick Hollon (nick-hollon-lc)
added a commit
that referenced
this pull request
Jun 1, 2026
…jections Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub with delegation to SyncThreadStream.interleave_projections (added in #7935, merged to main). Yields (channel, item) tuples in arrival order, matching the local GraphRunStream.interleave surface. The async adapter intentionally keeps no interleave (mirrors local AsyncGraphRunStream, where callers compose with asyncio.gather). interleave_projections resolves from the editable ../sdk-py workspace dependency (now 0.4.0), so no dependency pin change is needed -- main already carries langgraph-sdk>=0.4.0,<0.5.0 via #7927.
Nick Hollon (nick-hollon-lc)
added a commit
that referenced
this pull request
Jun 1, 2026
…jections Replace the sync _RemoteGraphRunStream.interleave NotImplementedError stub with delegation to SyncThreadStream.interleave_projections (added in #7935, merged to main). Yields (channel, item) tuples in arrival order, matching the local GraphRunStream.interleave surface. The async adapter intentionally keeps no interleave (mirrors local AsyncGraphRunStream, where callers compose with asyncio.gather). interleave_projections resolves from the editable ../sdk-py workspace dependency (now 0.4.0), so no dependency pin change is needed -- main already carries langgraph-sdk>=0.4.0,<0.5.0 via #7927.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactors the four (now five) sdk-py streaming projections into reusable, transport-agnostic
Decoderclasses and adds a newinterleave_projections(channels)method toAsyncThreadStreamandSyncThreadStreamthat drives multiple decoders from one shared subscription, yielding(channel_name, item)tuples in arrival order (the SDK analog of localGraphRunStream.interleave).langgraph_sdk/stream/decoders.py: purefeed(event) -> Iterable[item]state machines —ValuesDecoder,MessagesDecoder,ToolCallsDecoder,SubgraphsDecoder,ExtensionsDecoder— behind aDecoderProtocol. No subscription/queue/thread access.interleave_projections: flat-namespace channel list (built-ins + extension names),tool_calls↔toolswire mapping, subgraphs fed every event, extensions keyed by bare name.Notable
Decoder.feedtakesMapping[str, Any](read-only), so the Protocol is load-bearing in both stream files.Deferred (not in this PR)
RemoteGraph._RemoteGraphRunStream.interleavetointerleave_projections(gated on feat(langgraph): add v3 streaming support to RemoteGraph #7927)._Handle*Projection) to the decoders — hence the small, verified-identical helper duplication betweendecoders.pyand_async/stream.py.interleave_projectionshandles aren't registered for thread-close cleanup (additive follow-up).Test Plan
make testinlibs/sdk-py— 464 passed, 0 failuresmake format/make lint(ruff + ty) cleantests/streaming/test_decoders.pyinterleave_projectionstests (single-channel, multi-channel arrival order, builtin+extension mix, tool_calls public-name, subgraphs discovery) async + sync