feat: allow graph to graceful shutdown/drain by request#7274
Merged
Conversation
2b45a73 to
ea0b2ab
Compare
Quanzheng Long (longquanzheng)
previously approved these changes
Mar 28, 2026
Calling a `@task` from inside an async `@entrypoint` requires Python 3.11+ contextvars support to propagate the runnable config; on 3.10 it raises `Called get_config outside of a runnable context`. Mark the test with the existing NEEDS_CONTEXTVARS skip, matching the convention used by every other async-entrypoint+task test in this file. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
## Summary Ports Python PR [langchain-ai/langgraph#7274](langchain-ai/langgraph#7274) ("allow graph to graceful shutdown/drain by request") to LangGraphJS. Adds cooperative, between-superstep draining so a run can be asked to stop at the next superstep boundary, persist its checkpoint, and surface a resumable terminal error. This is the JS PR for the **Graph draining / graceful shutdown** parity unit. ## What's added - **`RunControl`** (new `pregel/runtime.ts`, exported from `@langchain/langgraph`): a run-scoped handle with `requestDrain(reason = "shutdown")` and read-only `drainRequested` / `drainReason`. - **`GraphDrained`** (`errors.ts`): a `GraphBubbleUp` subclass carrying `reason`, thrown when a run exits early due to drain. Plus an `isGraphDrained` guard. - **`control` option** on `invoke` / `stream` / `streamEvents` / `invoke`'s functional-API equivalents. It is surfaced on `runtime.control` (nodes can read it or call `requestDrain()`), and propagated into subgraphs. A fresh `RunControl` is provided per run when none is passed. ## Semantics (cooperative, between-superstep) `requestDrain()` flips a flag. The Pregel loop checks it at the top of each `tick()`, **after** the previous superstep's writes have been applied and checkpointed and the next tasks have been prepared. It never preempts work that is already running. | Scenario | Behavior | |---|---| | Node mid-execution | Runs to completion; drain takes effect at the next superstep. | | Graph naturally finishes on the same tick where drain was requested | Returns normally (status `done`). No `GraphDrained`. Caller can inspect `control.drainRequested`. | | More tasks remain | Saves the last completed superstep's checkpoint (also under `durability: "exit"`) and throws `GraphDrained(reason)`. Resume with `invoke(null, config)`. | | Subgraph requests drain | `GraphDrained` bubbles up through the parent loop and stops it at its own next boundary; the parent's checkpoint is saved and resumable. | Draining does **not** cancel async work. Pair it with an `AbortSignal` if you need a hard upper bound (see the `drain then cancel after a graceful timeout` test). ## Files - `errors.ts` — `GraphDrained` + `isGraphDrained` - `pregel/runtime.ts` — `RunControl` - `pregel/runnable_types.ts` — `control?: RunControl` on `Runtime` - `pregel/types.ts` — `control` on `PregelOptions` - `pregel/utils/config.ts`, `constants.ts` — config-key wiring - `pregel/loop.ts` — `"draining"` status + drain check at the tick boundary - `pregel/index.ts` — option wiring + raising `GraphDrained` - `pregel/runner.ts` — subgraph drain bubble-up handling ## Tests `libs/langgraph-core/src/tests/run_control.test.ts` (14 tests, all sync + async where applicable): drain stops the next step (sync/async), terminal-step drain finishes normally, exit- and default-durability resume, pre-drained control, subgraph → parent bubble + resume, external concurrent drain, drain-then-cancel via `AbortSignal`, reading/`requestDrain()` via `runtime.control`, `stream()` accepts control, and functional-API in-flight `task` futures still resolve. Full package suite passes (1358 + 14, 0 failures); lint and format are clean. ## Notable divergence from Python Python added `"drained"` to a local `SubgraphStatus` literal. The JS v3 stream lifecycle uses `AgentStatus` from the external `@langchain/protocol` package, which has no `"drained"` member, so `GraphDrained` propagates through streams as the terminal error rather than as a new lifecycle status. The parity-relevant signal — the `GraphDrained` exception — is what consumers catch. Noted in the changeset. ## Source - Python PR: langchain-ai/langgraph#7274 - Parity plan section: Graph draining / graceful shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds cooperative drain support for Pregel runs so a graph can be asked to stop at the next superstep boundary, persist its checkpoint, and surface a resumable terminal exception.
RunControl(inlanggraph.runtime) — a thread-safe handle whoserequest_drain(reason="shutdown")sets a single flag.GraphDrained(GraphBubbleUp)exception (inlanggraph.errors) raised when a run exits early due to drain. Carries thereasonstring.control: RunControl | Nonekwarg oninvoke/ainvoke/stream/astream/stream_v2/astream_v2. Wired through toRuntime.control, so nodes can readruntime.control.drain_requested/drain_reasonand even callrequest_drain()from inside a node."drained"as a terminalSubgraphStatus.The intended use is hooking SIGTERM (or any external supervisor signal) to
control.request_drain("sigterm")so an in-flight graph run can stop cleanly and be resumed later from the saved checkpoint.Semantics: cooperative, between-superstep
request_drain()flips a flag. The Pregel loop checks it at the top of eachtick(), after the previous superstep's writes have been applied and checkpointed. It never preempts work that is already running.@entrypointwith pending@taskfuturesdone; returns normally. NoGraphDrainedis raised. The caller can inspectcontrol.drain_requestedafterwards to distinguish a drained-but-completed run from a normal one.GraphDrained(reason). The checkpoint of the last completed superstep is saved (also underdurability="exit"). Resume withinvoke(None, config)/ainvoke(None, config).GraphDrainedbubbles up through the parent loop and stops it at its own next superstep boundary; the parent's checkpoint is saved and resumable.Drain does not cancel asyncio tasks or kill threads. Pair it with a graceful timeout +
task.cancel()(or process exit) if you need a hard upper bound — seetest_drain_then_cancel_after_graceful_timeoutfor the recommended pattern.Usage
Test plan
test_run_control_request_drain_stops_future_steps[_async])test_drain_requested_in_terminal_step_finishes_normally[_async])durability=\"exit\"persists a resumable checkpoint on drain (test_drain_with_exit_durability_persists_resume_checkpoint)test_drain_from_subgraph_can_resume_parent)test_external_drain_concurrent_sync/_async)test_drain_then_cancel_after_graceful_timeout)@taskfutures still resolve afterrequest_drain()(test_request_drain_allows_inflight_[a]call_scheduling)controlkwarg wired throughstream_v2(test_stream_v2_accepts_control_for_drain)Runtime.mergepreservescontrol(test_merge_runtime_preserves_run_control)