feat(langgraph): cooperative graph drain via RunControl#2449
Conversation
Port Python PR #7274 to LangGraphJS. Adds a RunControl handle with requestDrain()/drainRequested/drainReason, a control option on invoke/stream/etc. surfaced as runtime.control and propagated to subgraphs, and a GraphDrained bubble-up error. The Pregel loop checks the drain flag at the top of each superstep boundary (after the prior step is applied and checkpointed): if tasks remain it saves the checkpoint and throws GraphDrained (also under durability: exit) so the run can be resumed; if the graph naturally finishes that tick it returns normally. A subgraph drain bubbles up to stop the parent at its next boundary. Draining never cancels running work.
🦋 Changeset detectedLatest commit: 7afb0dd The changes in this PR will be included in the next version bump. This PR includes changesets to release 7 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
@langchain/langgraph-checkpoint
@langchain/langgraph-checkpoint-mongodb
@langchain/langgraph-checkpoint-postgres
@langchain/langgraph-checkpoint-redis
@langchain/langgraph-checkpoint-sqlite
@langchain/langgraph-checkpoint-validation
create-langgraph
@langchain/langgraph-api
@langchain/langgraph-cli
@langchain/langgraph
@langchain/langgraph-cua
@langchain/langgraph-supervisor
@langchain/langgraph-swarm
@langchain/langgraph-ui
@langchain/langgraph-sdk
@langchain/angular
@langchain/react
@langchain/svelte
@langchain/vue
commit: |
Hunter Lovell (hntrl)
left a comment
There was a problem hiding this comment.
maybe radical suggestion: we might not need this. We can functionally do the same thing by providing a signal to graph.invoke. Granted I don't think that lets nodes exit gracefully, but they functionally serve the same purpose? wdyt
|
I'd argue they're complementary, not redundant, and that's actually the recommended pattern: drain for a graceful window, then abort() as a hard backstop if a node overruns the grace period. Some further subtle differences:
|
run_control.test.ts (added by #2449) imports initializeAsyncLocalStorageSingleton from the removed ../setup/async_local_storage.js path, breaking the suite on main. Point it at ../node.js, matching the equivalent fixes for the timeout and node error handler tests. Co-authored-by: Cursor <cursoragent@cursor.com>
run_control.test.ts (added by #2449) imports initializeAsyncLocalStorageSingleton from the removed ../setup/async_local_storage.js path, breaking the suite on main. Point it at ../node.js, matching the equivalent fixes for the timeout and node error handler tests.
Resolve conflicts in libs/langgraph-core/src/pregel/loop.ts after the Python-parity PRs (#2450 timeouts, #2451 error handlers, #2449 RunControl, #2452 DeltaChannel) landed on main: - Drop the now-imported local createDuplexStream (moved to ./stream.js on main); keep the branch's checkpointNamespaceFromNs / deepestCheckpointMapNamespace helpers. - Combine the branch's CONFIG_KEY_REPLAY_STATE checkpoint-loading logic with main's hasPersistedParent tracking (computed before the empty fallback). - Keep the branch's isTimeTraveling RESUME/INTERRUPT filtering + fork checkpoint while preserving main's single-spread interruptSeen optimization. - Adopt main's 3-tuple StreamChunk _emit (meta now via _emitValuesWithCheckpointMeta) while keeping the branch's namespace param and #interruptStreamNamespace(). Also fix a merge artifact: the unit vitest config had a duplicate setupFiles key that dropped setup.node.ts (AsyncLocalStorage init); merged both setup files into one array. Update stale ../setup/async_local_storage.js imports in the new time_travel tests to ../node.js. Co-authored-by: Cursor <cursoragent@cursor.com>
This PR was opened by the [Changesets release](https://github.com/changesets/action) GitHub action. When you're ready to do a release, you can merge this and the packages will be published to npm automatically. If you're not ready to do a release yet, that's fine, whenever you add more changesets to main, this PR will be updated. # Releases ## @langchain/langgraph-checkpoint@1.1.0 ### Minor Changes - [#2452](#2452) [`a8e7659`](a8e7659) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add `DeltaChannel` and the writes-history saver API (beta). `DeltaChannel` is a reducer channel that stores only a sentinel in checkpoint blobs instead of the full accumulated value, reconstructing state on read by replaying ancestor writes through a batch reducer. This avoids re-serializing the entire accumulated value at every step (e.g. long message histories). - `DeltaChannel(reducer, { snapshotFrequency })` in `@langchain/langgraph` — count-based snapshot cadence (default `snapshotFrequency=1000`) plus a system bound `DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT` (default 5000, env `LANGGRAPH_DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT`). - `messagesDeltaReducer` — a batching-invariant messages reducer that coerces raw object/string writes, for use with `DeltaChannel`. - `BaseCheckpointSaver.getDeltaChannelHistory({ config, channels })` (beta) — walks the parent chain returning per-channel `{ writes, seed? }`, with a direct-storage override in `MemorySaver`. - `counters_since_delta_snapshot` added to `CheckpointMetadata`; `DeltaSnapshot` serialization support in the JSON+ serializer. Reconstruction is wired through the Pregel read/execution paths (initialization, `getState`, `updateState`, local reads) and `exit` durability accumulates and anchors delta writes so threads remain reconstructible without forcing snapshots. ### Patch Changes - [#2450](#2450) [`2f6d873`](2f6d873) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add node-level timeouts. A `timeout` option is now supported on `StateGraph.addNode`, the functional API (`task`/`entrypoint`), and the `Send` constructor. Pass a number of milliseconds for a hard wall-clock cap, or a `TimeoutPolicy` for finer control: ```ts import { TimeoutPolicy } from "@langchain/langgraph"; // hard wall-clock cap on each attempt builder.addNode("agent", agentFn, { timeout: 60_000 }); // full control builder.addNode("agent", agentFn, { timeout: { runTimeout: 60_000, // hard wall-clock cap, never refreshed idleTimeout: 10_000, // cap on time without observable progress refreshOn: "auto", // "auto" | "heartbeat" }, }); // per-task override new Send("agent", state, { timeout: { idleTimeout: 5_000 } }); ``` When a timeout fires, a `NodeTimeoutError` (carrying `node`, `kind` (`"run"`/`"idle"`), `timeout`, `elapsed`, `runTimeout`, `idleTimeout`) is raised, the attempt's buffered writes are dropped, and the node's `AbortSignal` is aborted. `idleTimeout` is refreshed by observable progress (writes, custom stream-writer calls, child-task scheduling, callback events) or an explicit `runtime.heartbeat()` call. The timer resets per retry attempt, and `NodeTimeoutError` is retryable under the default retry policy. Ports langchain-ai/langgraph#7599, [#7646](https://github.com/langchain-ai/langgraphjs/issues/7646), and [#7659](https://github.com/langchain-ai/langgraphjs/issues/7659). ## @langchain/langgraph@1.4.0 ### Minor Changes - [#2449](#2449) [`d12d269`](d12d269) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add cooperative, between-superstep graph draining via `RunControl`. A new `RunControl` (exported from `@langchain/langgraph`) exposes `requestDrain(reason)` plus read-only `drainRequested` / `drainReason`. Pass it through the new `control` option on `invoke` / `stream` / `streamEvents` (and the functional API). It is surfaced on `runtime.control`, so nodes can read it or call `requestDrain()` themselves, and it is propagated into subgraphs. When a drain is requested, the Pregel loop checks the flag at the top of each superstep (after the previous step's writes are applied and checkpointed): if more tasks remain it saves the checkpoint and throws the new `GraphDrained` error (also under `durability: "exit"`), so the run can be resumed later from the same config. If the graph naturally finishes on that tick it returns normally and the caller can inspect `control.drainRequested`. A drain requested inside a subgraph bubbles up and stops the parent at its next boundary. Draining never cancels work that is already running — pair it with an `AbortSignal` if you need a hard upper bound. - [#2452](#2452) [`a8e7659`](a8e7659) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add `DeltaChannel` and the writes-history saver API (beta). `DeltaChannel` is a reducer channel that stores only a sentinel in checkpoint blobs instead of the full accumulated value, reconstructing state on read by replaying ancestor writes through a batch reducer. This avoids re-serializing the entire accumulated value at every step (e.g. long message histories). - `DeltaChannel(reducer, { snapshotFrequency })` in `@langchain/langgraph` — count-based snapshot cadence (default `snapshotFrequency=1000`) plus a system bound `DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT` (default 5000, env `LANGGRAPH_DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT`). - `messagesDeltaReducer` — a batching-invariant messages reducer that coerces raw object/string writes, for use with `DeltaChannel`. - `BaseCheckpointSaver.getDeltaChannelHistory({ config, channels })` (beta) — walks the parent chain returning per-channel `{ writes, seed? }`, with a direct-storage override in `MemorySaver`. - `counters_since_delta_snapshot` added to `CheckpointMetadata`; `DeltaSnapshot` serialization support in the JSON+ serializer. Reconstruction is wired through the Pregel read/execution paths (initialization, `getState`, `updateState`, local reads) and `exit` durability accumulates and anchors delta writes so threads remain reconstructible without forcing snapshots. - [#2451](#2451) [`d65a920`](d65a920) Thanks [@christian-bromann](https://github.com/christian-bromann)! - feat(langgraph): add node-level error handlers `StateGraph.addNode(name, fn, { errorHandler })` now accepts a first-class node-level error handler. The handler runs ONLY after the failing node's `retryPolicy` is exhausted, so retry and handling stay decoupled. It receives a typed `NodeError { node, error }` and the typed node input state, can return a state update, and can route to a recovery branch via `new Command({ goto })` (saga / compensation flows). Failure provenance is checkpointed (via a reserved `ERROR_SOURCE_NODE` write) so handlers observe the same context after a checkpoint resume. Uncaught node errors without a handler still abort the run as before, and `GraphBubbleUp` errors (such as `interrupt()`) are never swallowed by a handler. `StateGraph.setNodeDefaults({ errorHandler })` now also accepts a graph-wide default handler. It is materialized at `compile()` as a single shared handler and invoked for every regular node that does not set its own `errorHandler`. A per-node handler always takes precedence, the default never catches a failure raised by an error-handler node itself (handler failures fail the run), and the default is not inherited by subgraphs. Ports the Python feature from langchain-ai/langgraph#7233. - [#2450](#2450) [`2f6d873`](2f6d873) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add node-level timeouts. A `timeout` option is now supported on `StateGraph.addNode`, the functional API (`task`/`entrypoint`), and the `Send` constructor. Pass a number of milliseconds for a hard wall-clock cap, or a `TimeoutPolicy` for finer control: ```ts import { TimeoutPolicy } from "@langchain/langgraph"; // hard wall-clock cap on each attempt builder.addNode("agent", agentFn, { timeout: 60_000 }); // full control builder.addNode("agent", agentFn, { timeout: { runTimeout: 60_000, // hard wall-clock cap, never refreshed idleTimeout: 10_000, // cap on time without observable progress refreshOn: "auto", // "auto" | "heartbeat" }, }); // per-task override new Send("agent", state, { timeout: { idleTimeout: 5_000 } }); ``` When a timeout fires, a `NodeTimeoutError` (carrying `node`, `kind` (`"run"`/`"idle"`), `timeout`, `elapsed`, `runTimeout`, `idleTimeout`) is raised, the attempt's buffered writes are dropped, and the node's `AbortSignal` is aborted. `idleTimeout` is refreshed by observable progress (writes, custom stream-writer calls, child-task scheduling, callback events) or an explicit `runtime.heartbeat()` call. The timer resets per retry attempt, and `NodeTimeoutError` is retryable under the default retry policy. Ports langchain-ai/langgraph#7599, [#7646](https://github.com/langchain-ai/langgraphjs/issues/7646), and [#7659](https://github.com/langchain-ai/langgraphjs/issues/7659). - [#2461](#2461) [`801d955`](801d955) Thanks [@christian-bromann](https://github.com/christian-bromann)! - Add `StateGraph.setNodeDefaults()` for setting graph-wide node policy defaults (`retryPolicy`, `cachePolicy`). Per-node values passed to `addNode` always take precedence, and defaults are resolved at `compile()` time so call order does not matter. Defaults are not inherited by subgraphs. Ports Python's `set_node_defaults()` (langchain-ai/langgraph#7747). ### Patch Changes - [#2179](#2179) [`01c67df`](01c67df) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix(core): time travel replay/fork for graphs with interrupts and subgraphs Ports Python fixes for stale RESUME writes during replay, wrong subgraph checkpoint loading during time travel, missing fork checkpoints on replay, and direct-to-subgraph time travel. - [#2514](#2514) [`9e0201d`](9e0201d) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix(schema): expose StateSchema JSON schemas for Studio introspection Route StateSchema runtime definitions through getJsonSchema() and getInputJsonSchema() so LangGraph Studio receives state, input, and context schemas when graphs use the StateSchema primitive. Fixes [#2466](#2466) - [#2471](#2471) [`9b96f60`](9b96f60) Thanks [@christian-bromann](https://github.com/christian-bromann)! - perf(core): skip debug checkpoint snapshots when not streaming them Avoid building full-state `mapDebugCheckpoint` payloads on every tick when no consumer subscribed to `checkpoints` or `debug` stream modes. v3 companion checkpoint envelopes are unchanged (they come from values metadata). - [#2472](#2472) [`8e06ace`](8e06ace) Thanks [@christian-bromann](https://github.com/christian-bromann)! - perf(core): index pending writes for O(1) task-prep lookups Build a PendingWritesIndex once per \_prepareNextTasks call so resume and skip-done-task checks avoid repeated linear scans over checkpointPendingWrites. - [#2473](#2473) [`a8b0036`](a8b0036) Thanks [@christian-bromann](https://github.com/christian-bromann)! - perf(core): optimize applyWrites, interrupt seen, and channel errors Reduce allocations in \_applyWrites, fix O(N²) interrupt versions_seen updates, skip stack traces on EmptyChannelError control flow, and cache task lists in the pregel loop and runner. - [#2444](#2444) [`4096933`](4096933) Thanks [@christian-bromann](https://github.com/christian-bromann)! - feat(remote): add RemoteGraph v3 streaming support Expose the v3 `streamEvents` surface for `RemoteGraph` by adapting remote SDK thread streams to the local `GraphRunStream` shape. - Updated dependencies \[[`a8e7659`](a8e7659), [`2f6d873`](2f6d873)]: - @langchain/langgraph-checkpoint@1.1.0 ## @langchain/langgraph-checkpoint-mongodb@1.3.4 ### Patch Changes - [#2517](#2517) [`67a4f8d`](67a4f8d) Thanks [@jackjin1997](https://github.com/jackjin1997)! - fix: `MongoDBSaver.putWrites` now honors `WRITES_IDX_MAP`, pinning special channels (`__error__`, `__scheduled__`, `__interrupt__`, `__resume__`) to fixed negative indices instead of the call-local ordinal. Previously a mixed `putWrites([[...regular...], [INTERRUPT, …]], taskId)` placed the INTERRUPT at a positive idx that could collide with a regular write at the same `(task_id, idx)`, and the unconditional `$set` upsert silently overwrote whichever row landed there first. The conflict-resolution clause now matches the Postgres / SQLite (TS and Python) checkpointers: `$set` only when every channel is a special one, `$setOnInsert` otherwise. ## @langchain/langgraph-checkpoint-postgres@1.0.3 ### Patch Changes - [#2512](#2512) [`375c73f`](375c73f) Thanks [@jackjin1997](https://github.com/jackjin1997)! - fix: reject SQL `LIKE` wildcards (`%`, `_`) and the backslash escape character in `PostgresStore` namespace labels. `BaseStore.search()` matches namespaces via `namespace_path LIKE ${prefix}%`, and these characters in caller-supplied namespace labels are interpreted as wildcards by Postgres even through a bound parameter — letting a namespace prefix of `["%"]` match every namespace in the store across tenants. `validateNamespace` now throws for these characters at all `search` / `get` / `put` entrypoints, keeping store-wide consistency. CWE-1336. ## @langchain/langgraph-checkpoint-redis@1.0.8 ### Patch Changes - [#2518](#2518) [`9182ea3`](9182ea3) Thanks [@jackjin1997](https://github.com/jackjin1997)! - fix: `RedisSaver.putWrites` now honors `WRITES_IDX_MAP`, pinning special channels (`__error__`, `__scheduled__`, `__interrupt__`, `__resume__`) to fixed negative indices in their Redis key (`checkpoint_write:…:<idx>`) instead of the call-local ordinal. Previously a mixed `putWrites([[…regular…], [INTERRUPT, …]], taskId)` placed the INTERRUPT key at the positive idx of its position in the batch, where a peer task's regular write at the same idx would overwrite it via the unconditional `JSON.SET`. The conflict-resolution clause now matches Postgres / SQLite / MongoDB: unguarded `JSON.SET` when every write is a special channel, `JSON.SET … NX` (insert-or-ignore) otherwise. ## @langchain/langgraph-checkpoint-sqlite@1.0.3 ### Patch Changes - [#2516](#2516) [`f6a6d26`](f6a6d26) Thanks [@jackjin1997](https://github.com/jackjin1997)! - fix: `SqliteSaver.putWrites` now honors `WRITES_IDX_MAP`, pinning special channels (`__error__`, `__scheduled__`, `__interrupt__`, `__resume__`) to fixed negative indices instead of the call-local ordinal. Previously a follow-up `putWrites([[INTERRUPT, …]], taskId)` for the same checkpoint silently `REPLACE`d the regular write previously stored at `idx=0` for that task, losing data. The conflict-resolution clause also now matches the Python checkpointer contract: `OR REPLACE` only when every channel is a special one (so e.g. INTERRUPT→RESUME state transitions overwrite), `OR IGNORE` otherwise. ## @langchain/angular@1.0.21 ### Patch Changes - [#2515](#2515) [`49b8c1a`](49b8c1a) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix: make AnyStream a true supertype so selector hooks need no cast A concrete `useStream<typeof agent>()` handle was not assignable to `AnyStream` because generic-computed covariant members (`toolCalls`, `values`) don't widen under `any` — `InferToolCalls<any>[]` resolves to `AssembledToolCall<…, never>[]`, narrower than a concrete handle. Override those members with their widest forms (preserving each framework's reactivity wrapper — plain arrays for React/Svelte, `ShallowRef` for Vue, `Signal` for Angular) so the message/tool/value selector hooks accept a fully-typed stream without an `as AnyStream` cast. ## @langchain/react@1.0.21 ### Patch Changes - [#2515](#2515) [`49b8c1a`](49b8c1a) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix: make AnyStream a true supertype so selector hooks need no cast A concrete `useStream<typeof agent>()` handle was not assignable to `AnyStream` because generic-computed covariant members (`toolCalls`, `values`) don't widen under `any` — `InferToolCalls<any>[]` resolves to `AssembledToolCall<…, never>[]`, narrower than a concrete handle. Override those members with their widest forms (preserving each framework's reactivity wrapper — plain arrays for React/Svelte, `ShallowRef` for Vue, `Signal` for Angular) so the message/tool/value selector hooks accept a fully-typed stream without an `as AnyStream` cast. ## @langchain/svelte@1.0.21 ### Patch Changes - [#2515](#2515) [`49b8c1a`](49b8c1a) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix: make AnyStream a true supertype so selector hooks need no cast A concrete `useStream<typeof agent>()` handle was not assignable to `AnyStream` because generic-computed covariant members (`toolCalls`, `values`) don't widen under `any` — `InferToolCalls<any>[]` resolves to `AssembledToolCall<…, never>[]`, narrower than a concrete handle. Override those members with their widest forms (preserving each framework's reactivity wrapper — plain arrays for React/Svelte, `ShallowRef` for Vue, `Signal` for Angular) so the message/tool/value selector hooks accept a fully-typed stream without an `as AnyStream` cast. ## @langchain/vue@1.0.21 ### Patch Changes - [#2515](#2515) [`49b8c1a`](49b8c1a) Thanks [@christian-bromann](https://github.com/christian-bromann)! - fix: make AnyStream a true supertype so selector hooks need no cast A concrete `useStream<typeof agent>()` handle was not assignable to `AnyStream` because generic-computed covariant members (`toolCalls`, `values`) don't widen under `any` — `InferToolCalls<any>[]` resolves to `AssembledToolCall<…, never>[]`, narrower than a concrete handle. Override those members with their widest forms (preserving each framework's reactivity wrapper — plain arrays for React/Svelte, `ShallowRef` for Vue, `Signal` for Angular) so the message/tool/value selector hooks accept a fully-typed stream without an `as AnyStream` cast. ## @example/ai-elements@0.1.36 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`49b8c1a`](49b8c1a), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 - @langchain/react@1.0.21 ## @examples/assistant-ui-claude@0.1.36 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`49b8c1a`](49b8c1a), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 - @langchain/react@1.0.21 ## @examples/ui-angular@0.0.46 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`49b8c1a`](49b8c1a), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 - @langchain/angular@1.0.21 ## @examples/ui-multimodal@0.0.22 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`49b8c1a`](49b8c1a), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 - @langchain/react@1.0.21 ## @examples/ui-react@0.0.22 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`49b8c1a`](49b8c1a), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 - @langchain/react@1.0.21 ## langgraph@1.0.40 ### Patch Changes - Updated dependencies \[[`01c67df`](01c67df), [`d12d269`](d12d269), [`a8e7659`](a8e7659), [`9e0201d`](9e0201d), [`9b96f60`](9b96f60), [`8e06ace`](8e06ace), [`d65a920`](d65a920), [`2f6d873`](2f6d873), [`a8b0036`](a8b0036), [`4096933`](4096933), [`801d955`](801d955)]: - @langchain/langgraph@1.4.0 Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Summary
Ports Python PR langchain-ai/langgraph#7274 ("allow graph to graceful shutdown/drain by request") to LangGraphJS. Adds cooperative, between-superstep draining so a run can be asked to stop at the next superstep boundary, persist its checkpoint, and surface a resumable terminal error.
This is the JS PR for the Graph draining / graceful shutdown parity unit.
What's added
RunControl(newpregel/runtime.ts, exported from@langchain/langgraph): a run-scoped handle withrequestDrain(reason = "shutdown")and read-onlydrainRequested/drainReason.GraphDrained(errors.ts): aGraphBubbleUpsubclass carryingreason, thrown when a run exits early due to drain. Plus anisGraphDrainedguard.controloption oninvoke/stream/streamEvents/invoke's functional-API equivalents. It is surfaced onruntime.control(nodes can read it or callrequestDrain()), and propagated into subgraphs. A freshRunControlis provided per run when none is passed.Semantics (cooperative, between-superstep)
requestDrain()flips a flag. The Pregel loop checks it at the top of eachtick(), after the previous superstep's writes have been applied and checkpointed and the next tasks have been prepared. It never preempts work that is already running.done). NoGraphDrained. Caller can inspectcontrol.drainRequested.durability: "exit") and throwsGraphDrained(reason). Resume withinvoke(null, config).GraphDrainedbubbles up through the parent loop and stops it at its own next boundary; the parent's checkpoint is saved and resumable.Draining does not cancel async work. Pair it with an
AbortSignalif you need a hard upper bound (see thedrain then cancel after a graceful timeouttest).Files
errors.ts—GraphDrained+isGraphDrainedpregel/runtime.ts—RunControlpregel/runnable_types.ts—control?: RunControlonRuntimepregel/types.ts—controlonPregelOptionspregel/utils/config.ts,constants.ts— config-key wiringpregel/loop.ts—"draining"status + drain check at the tick boundarypregel/index.ts— option wiring + raisingGraphDrainedpregel/runner.ts— subgraph drain bubble-up handlingTests
libs/langgraph-core/src/tests/run_control.test.ts(14 tests, all sync + async where applicable):drain stops the next step (sync/async), terminal-step drain finishes normally, exit- and default-durability resume, pre-drained control, subgraph → parent bubble + resume, external concurrent drain, drain-then-cancel via
AbortSignal, reading/requestDrain()viaruntime.control,stream()accepts control, and functional-API in-flighttaskfutures still resolve. Full package suite passes (1358 + 14, 0 failures); lint and format are clean.Notable divergence from Python
Python added
"drained"to a localSubgraphStatusliteral. The JS v3 stream lifecycle usesAgentStatusfrom the external@langchain/protocolpackage, which has no"drained"member, soGraphDrainedpropagates through streams as the terminal error rather than as a new lifecycle status. The parity-relevant signal — theGraphDrainedexception — is what consumers catch. Noted in the changeset.Source