feat(langgraph): DeltaChannel: store sentinel in blobs, reconstruct from checkpoint_writes#7586
Merged
Merged
Conversation
DeltaChannel: store sentinel in blobs, reconstruct from checkpoint_writes
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add serde support for DiffDelta by implementing dump/load for the "diff" type tag.
This allows the checkpoint system to efficiently store delta objects by serializing
them as msgpack-encoded dicts with {"d": delta, "p": prev_version} structure.
The implementation uses runtime type checking to avoid circular imports and
leverages the existing msgpack ext hooks for proper deserialization of complex
types like LangChain messages.
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Replace duck-typing check with lazy import inside _is_diff_delta helper function to avoid module-level circular dependency while using proper isinstance semantics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds DiffChannel, a new channel type that stores only per-step write deltas in checkpoints and reconstructs the full list by replaying the chain through the operator at load time. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Add logger.warning when a mid-chain blob is missing (fixes silent truncation bug) - Add cycle guard to prevent infinite loops on corrupt blob stores - Fix type annotation on diff_channels from dict[str, Any] to dict[str, str] Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nels Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…sync) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Add `_load_diff_chains_async` to `AsyncPostgresSaver` and override `_load_checkpoint_tuple` to inline blob-parsing and diff-chain resolution via async point-lookup traversal, mirroring the sync `PostgresSaver._load_diff_chains` implementation. Add integration test `test_diff_channel_chain_reconstruction` that skips gracefully when `langgraph` core is not installed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… traversal Fixes a critical deadlock that occurs when _load_diff_chains calls self._cursor() from within _load_blobs while the outer _load_checkpoint_tuple already holds self._cursor(). On bare (non-pool) connections, the threading.Lock is not reentrant, causing a deadlock. Solution: Pass the cursor as a parameter to _load_diff_chains and _load_blobs instead of acquiring a new cursor within those methods. Updated _load_checkpoint_tuple to acquire a cursor once at the top level and pass it through the call chain. Changes: - Updated _load_blobs signature to accept optional cur parameter - Updated _load_diff_chains signature (base and implementations) to accept optional cur parameter - Modified _load_checkpoint_tuple in PostgresSaver to acquire cursor and pass it - Modified _load_checkpoint_tuple_async to acquire cursor only when diff_payloads exist - Removed nested self._cursor() calls in _load_diff_chains and _load_diff_chains_async Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…me benchmark Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…traversal Periodic full-snapshot checkpoints cap chain depth, trading a small amount of extra storage for bounded reconstruction time. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Renames the diff-channel types to DeltaChannel, DeltaValue, and DeltaChainValue for consistency with the settled naming convention.
…s DeltaValue Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…int_id in DeltaValue Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…d to after_checkpoint
…delta_reducer Renames `operator` → `reducer` and flips arg order to `(reducer, typ=None)`, matching the new batch contract: `reducer(state, list[writes]) -> state`. The reducer receives all writes for a step in one call instead of being folded pairwise, enabling single-pass implementations that avoid O(N²) reprocessing. `typ` is now optional — `_is_field_channel` in `graph/state.py` always overwrites it from the `Annotated[T, ...]` outer type, so users can write `DeltaChannel(my_reducer)` rather than `DeltaChannel(list, my_reducer)`. Adds `_messages_delta_reducer` to `langgraph.graph.message` (experimental): a single-pass bulk reducer for message lists that deduplicates by ID and handles `RemoveMessage` tombstoning without calling `add_messages`, avoiding repeated dedup passes that `add_messages` would incur in a fold. Also fixes the `_delta_write_futs` mypy error in `AsyncPregelLoop` by moving the type annotation to the class body, and unignores `new_pr_desc.md` from the repo via `.gitignore`. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
channel: None = None -> channel: None; all call sites pass None explicitly and the default was never needed. Restores the original signature. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…d paths Revert pure-style refactors (local variable hoisting, redundant null guards, Sequence/list annotation change) that cluttered the DeltaChannel PR diff without any semantic change.
Collaborator
Author
There was a problem hiding this comment.
open questions
- is the prune approach correct -- removing for in memory saver for now, but we should update for LSD i think
- have we properly tested all backwards compat cases -- added
- do we ever need to coerce messages to pydantic types in the reducer (i think saver manages), can test out beta reducer and find out
- is the delta channel design good (can refactor internals later so less concerned about this)
- do we need missing and delta sentinel, seems redundant
- is the checkpoint change ok (get next version)
prune was not previously implemented on InMemorySaver (raised NotImplementedError); adding a DeltaChannel-aware implementation is a follow-up concern, not required for the core feature.
- Add test_add_messages_to_delta_migration_preserves_message_history (sync + async) covering the primary real-world BinaryOperatorAggregate → DeltaChannel migration path with real Message objects and IDs - Hoist all in-function imports to module level in test_channels.py and fix _delta_channel_with_type helper accordingly - Add section headers in test_channels.py for better navigation
Sydney Runkle (sydney-runkle)
added a commit
that referenced
this pull request
Apr 30, 2026
…epth with write-count snapshotting (#7634) ## Summary Builds on #7586. Adds `snapshot_frequency: int | None` to `DeltaChannel`, letting users trade storage for bounded read depth. Also promotes `channels/_delta.py` from private to public (`channels/delta.py`). ### How it works Every Nth **pregel step**, `create_checkpoint` writes a `_DeltaSnapshot` blob instead of `DELTA_SENTINEL`. The ancestor walk in `_get_channel_writes_history` terminates at the snapshot rather than walking the full chain, bounding replay to at most N steps. Snapshots are **eager**: fired even on steps where the channel had no write (via a `get_next_version` version bump), so the depth bound holds unconditionally — no risk of the cadence drifting if a channel happens to be silent at a snapshot step. ### Storage formula | Mode | Blob storage | Read depth | |------|-------------|------------| | `snapshot_frequency=None` (pure delta) | O(N) — sentinels only | O(N) steps | | `snapshot_frequency=K` | O(N²/K) — periodic snapshots of growing size | O(K) steps | | add_messages / BinOp | O(N²) — full blob every step | O(1) | At N turns with ~400 char/msg messages, total snapshot storage ≈ N²/(2K) × avg_msg_size, since each snapshot blob grows linearly with accumulated messages. ### Key design decisions - **Step-based**: `snapshot_frequency=K` means "snapshot every K pregel steps." `create_checkpoint` has the step number; the channel itself doesn't need to track writes. - **Eager**: version-bumped via `get_next_version` even on non-write steps so `put()` always stores the blob. - **`_DeltaSnapshot` NamedTuple + msgpack ext type** (`EXT_DELTA_SNAPSHOT = 7`): serde type tag dispatches in `from_checkpoint` — no dict key inspection, no collision risk. - **`from_checkpoint` semantics**: `_DeltaSnapshot` → restore value directly (no replay needed); `DELTA_SENTINEL` / `MISSING` → replay from ancestor writes; plain value → pre-migration BinOp blob. - **InMemorySaver and PostgresSaver updated**: `_get_channel_writes_history` collects the snapshot ancestor's pending_writes before terminating (they encode the *next* step's transition, unlike pre-delta migration blobs which subsume their own writes). - **`snapshot_frequency=None`** is the pure-delta default (replaces `math.inf`). ### Benchmark results (InMemory, ~400 char/msg) **Storage** | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 5.9 MB | 1.2 MB | 601.3 KB | 119.8 KB | 29.5 KB | | 100 | ~20K tok | 23.7 MB | 4.8 MB | 2.4 MB | 475.8 KB | 58.4 KB | | 200 | ~40K tok | 94.6 MB | 19.0 MB | 9.5 MB | 1.9 MB | 116.4 KB | | 500 | ~100K tok | 591.5 MB | 118.4 MB | 59.2 MB | 11.8 MB | 290.3 KB | **Read latency** (avg of 5 `get_state` calls) | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 0.4ms | 0.4ms | 0.7ms | 0.9ms | 1.8ms | | 100 | ~20K tok | 0.7ms | 0.9ms | 1.0ms | 1.7ms | 5.7ms | | 200 | ~40K tok | 1.5ms | 1.7ms | 4.5ms | 3.7ms | 20.1ms | | 500 | ~100K tok | 3.6ms | 4.2ms | 4.4ms | 9.0ms | 110.3ms | **Per-invoke write latency** | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 1.5ms | 1.1ms | 1.1ms | 1.3ms | 1.7ms | | 100 | ~20K tok | 2.5ms | 1.6ms | 1.5ms | 1.7ms | 3.3ms | | 200 | ~40K tok | 3.4ms | 2.3ms | 2.2ms | 2.5ms | 8.3ms | | 500 | ~100K tok | 6.2ms | 4.2ms | 3.6ms | 4.1ms | 39.2ms | ## Test plan - [x] `make format` / `make lint` clean across `langgraph`, `checkpoint`, `checkpoint-postgres` - [x] `tests/test_channels.py` — 37 passing including step-based and eager-snapshot tests - [x] `tests/test_delta_channel_migration.py` — all passing - [x] Full suite: 1387 passing, 6 pre-existing failures unrelated to this branch --------- Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
May 29, 2026
…ver API (beta)
Adds the writes-history support surface backing DeltaChannel:
- DeltaSnapshot class + isDeltaSnapshot, with JSON+ serialization round-trip
- counters_since_delta_snapshot on CheckpointMetadata + DeltaChannelHistory type
- BaseCheckpointSaver.getDeltaChannelHistory({ config, channels }) default
ancestor-walk implementation, with a direct-storage MemorySaver override
Ports the end-state of langchain-ai/langgraph#7586, #7699, #7746, #7732.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
May 29, 2026
… cadence helpers (beta) - DeltaChannel: reducer channel storing a sentinel-free omission in blobs and reconstructing state by replaying ancestor writes; count-based snapshot cadence (snapshotFrequency=1000 default) - DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT system bound (default 5000, env override) - messagesDeltaReducer: batching-invariant messages reducer with dict/string coercion (langchain-ai/langgraph#7680) - createCheckpoint channelsToSnapshot/getNextVersion/updatedChannels options, deltaChannelsToSnapshot, and async channelsFromCheckpoint reconstruction - public exports of DeltaChannel + messagesDeltaReducer Ports the end-state of langchain-ai/langgraph#7586, #7634, #7680.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
May 29, 2026
…engine - loop: per-delta-channel counter bookkeeping, snapshot cadence decision, reconstruction at initialization, stable message-IDs for delta writes, raw-input delta write persistence, and an exit-durability accumulator that anchors accumulated writes to the parent (or a stub) instead of forcing a snapshot - index: delta-aware reconstruction in getState and updateState - algo: snapshot delta channels for fresh local reads Ports the end-state of langchain-ai/langgraph#7586, #7730, #7746.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 1, 2026
…ver API (beta)
Adds the writes-history support surface backing DeltaChannel:
- DeltaSnapshot class + isDeltaSnapshot, with JSON+ serialization round-trip
- counters_since_delta_snapshot on CheckpointMetadata + DeltaChannelHistory type
- BaseCheckpointSaver.getDeltaChannelHistory({ config, channels }) default
ancestor-walk implementation, with a direct-storage MemorySaver override
Ports the end-state of langchain-ai/langgraph#7586, #7699, #7746, #7732.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 1, 2026
… cadence helpers (beta) - DeltaChannel: reducer channel storing a sentinel-free omission in blobs and reconstructing state by replaying ancestor writes; count-based snapshot cadence (snapshotFrequency=1000 default) - DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT system bound (default 5000, env override) - messagesDeltaReducer: batching-invariant messages reducer with dict/string coercion (langchain-ai/langgraph#7680) - createCheckpoint channelsToSnapshot/getNextVersion/updatedChannels options, deltaChannelsToSnapshot, and async channelsFromCheckpoint reconstruction - public exports of DeltaChannel + messagesDeltaReducer Ports the end-state of langchain-ai/langgraph#7586, #7634, #7680.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 1, 2026
…engine - loop: per-delta-channel counter bookkeeping, snapshot cadence decision, reconstruction at initialization, stable message-IDs for delta writes, raw-input delta write persistence, and an exit-durability accumulator that anchors accumulated writes to the parent (or a stub) instead of forcing a snapshot - index: delta-aware reconstruction in getState and updateState - algo: snapshot delta channels for fresh local reads Ports the end-state of langchain-ai/langgraph#7586, #7730, #7746.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
…ver API (beta)
Adds the writes-history support surface backing DeltaChannel:
- DeltaSnapshot class + isDeltaSnapshot, with JSON+ serialization round-trip
- counters_since_delta_snapshot on CheckpointMetadata + DeltaChannelHistory type
- BaseCheckpointSaver.getDeltaChannelHistory({ config, channels }) default
ancestor-walk implementation, with a direct-storage MemorySaver override
Ports the end-state of langchain-ai/langgraph#7586, #7699, #7746, #7732.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
… cadence helpers (beta) - DeltaChannel: reducer channel storing a sentinel-free omission in blobs and reconstructing state by replaying ancestor writes; count-based snapshot cadence (snapshotFrequency=1000 default) - DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT system bound (default 5000, env override) - messagesDeltaReducer: batching-invariant messages reducer with dict/string coercion (langchain-ai/langgraph#7680) - createCheckpoint channelsToSnapshot/getNextVersion/updatedChannels options, deltaChannelsToSnapshot, and async channelsFromCheckpoint reconstruction - public exports of DeltaChannel + messagesDeltaReducer Ports the end-state of langchain-ai/langgraph#7586, #7634, #7680.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
…engine - loop: per-delta-channel counter bookkeeping, snapshot cadence decision, reconstruction at initialization, stable message-IDs for delta writes, raw-input delta write persistence, and an exit-durability accumulator that anchors accumulated writes to the parent (or a stub) instead of forcing a snapshot - index: delta-aware reconstruction in getState and updateState - algo: snapshot delta channels for fresh local reads Ports the end-state of langchain-ai/langgraph#7586, #7730, #7746.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
…ver API (beta)
Adds the writes-history support surface backing DeltaChannel:
- DeltaSnapshot class + isDeltaSnapshot, with JSON+ serialization round-trip
- counters_since_delta_snapshot on CheckpointMetadata + DeltaChannelHistory type
- BaseCheckpointSaver.getDeltaChannelHistory({ config, channels }) default
ancestor-walk implementation, with a direct-storage MemorySaver override
Ports the end-state of langchain-ai/langgraph#7586, #7699, #7746, #7732.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
… cadence helpers (beta) - DeltaChannel: reducer channel storing a sentinel-free omission in blobs and reconstructing state by replaying ancestor writes; count-based snapshot cadence (snapshotFrequency=1000 default) - DELTA_MAX_SUPERSTEPS_SINCE_SNAPSHOT system bound (default 5000, env override) - messagesDeltaReducer: batching-invariant messages reducer with dict/string coercion (langchain-ai/langgraph#7680) - createCheckpoint channelsToSnapshot/getNextVersion/updatedChannels options, deltaChannelsToSnapshot, and async channelsFromCheckpoint reconstruction - public exports of DeltaChannel + messagesDeltaReducer Ports the end-state of langchain-ai/langgraph#7586, #7634, #7680.
Christian Bromann (christian-bromann)
added a commit
to langchain-ai/langgraphjs
that referenced
this pull request
Jun 10, 2026
…engine - loop: per-delta-channel counter bookkeeping, snapshot cadence decision, reconstruction at initialization, stable message-IDs for delta writes, raw-input delta write persistence, and an exit-durability accumulator that anchors accumulated writes to the parent (or a stub) instead of forcing a snapshot - index: delta-aware reconstruction in getState and updateState - algo: snapshot delta channels for fresh local reads Ports the end-state of langchain-ai/langgraph#7586, #7730, #7746.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
DeltaChannel: stop checkpointing the same data over and over
The problem
LangGraph serializes full accumulated state into a checkpoint blob at every step. For a 100-turn conversation, that's 100 increasingly-large copies of the same message list — even though each step only added one message. At 500 turns,
add_messagesconsumes 219 MB just for blobs.What this PR does
Introduces
DeltaChannel: a reducer channel that stores only a zero-byte sentinel in checkpoint blobs instead of the full accumulated value. On restore, the runtime walks the ancestor chain, collects the per-step writes, and replays them through the reducer to reconstruct state.Storage savings
snapshot_frequency=Nwrites a full-value blob every N steps, bounding replay depth. Without it, blob storage is theoretically near-zero but read latency grows unboundedly with thread length — not practical for production.Total checkpoint storage (blobs + writes + metadata, ~400 chars/message):
add_messagesDeltaChannel(no snapshots)DeltaChannel(snapshot_frequency=50)† No snapshots trades read latency for maximum storage savings — see below.
The honest tradeoff: read latency
Without snapshots, every
get_statewalks the full ancestor chain — latency grows linearly with thread length.Read latency (
get_state, InMemory, 5-call avg):add_messagesDeltaChannel(no snapshots)DeltaChannel(snapshot_frequency=50)† Grows unboundedly with thread length.
Reducer requirements
DeltaChanneltakes a batch reducer(state, list[writes]) -> state. Two requirements:reducer(reducer(s, [a, b]), [c]) == reducer(s, [a, b, c]). This matters because LangGraph may replay writes in different batch sizes than they were originally produced; a non-associative reducer would silently reconstruct wrong state.If your reducer isn't associative, use
BinaryOperatorAggregateinstead —DeltaChannelis not a drop-in replacement for every reducer.Backwards compatibility: no migration required
Threads written under
BinaryOperatorAggregate(includingadd_messages) work transparently after swapping the annotation — existing checkpoints are not touched.How: when loading state,
DeltaChannelasks the saver to walk the ancestor chain and collect per-step writes. The walk stops as soon as it hits an ancestor whose stored blob is a real value (not a sentinel). OldBinaryOperatorAggregatecheckpoints store the full accumulated list at every step, so the walk terminates at the nearest pre-migration ancestor and uses that list as the starting point. Any writes recorded after the migration are then replayed on top.This means time-travel,
get_state_history, and resuming from mid-thread checkpoints all work correctly across the migration boundary — both before and after the swap. The only constraint is that the new reducer must produce the same accumulated value as the old one would have for any writes made after the migration point.How it works
checkpoint()always returnsDELTA_SENTINEL(a zero-byte msgpack ext marker). On restore,_get_channel_writes_historywalks the parent chain collectingcheckpoint_writesuntil it hits a real blob (a_DeltaSnapshot, migration artifact, orupdate_state-written value), thenreplay_writesfolds those deltas onto the seed through the reducer.In
durability="async"mode,AsyncPregelLooptracks in-flightaput_writesfutures for DeltaChannel fields and drains them beforeaput()— ensuring writes are durable before the sentinel blob is committed.What's in scope
libs/langgraph/langgraph/channels/delta.py—DeltaChannelimplementationlibs/langgraph/langgraph/graph/message.py—_messages_delta_reducer(experimental)libs/checkpoint/—_get_channel_writes_historyancestor-walk API onBaseCheckpointSaver, optimizedInMemorySaveroverridelibs/checkpoint-postgres/—PostgresSaver/AsyncPostgresSaversingle-roundtrip UNION ALL overridelibs/langgraph/langgraph/pregel/— checkpoint wiring, async write-ordering safetyKnown limitations
ShallowPostgresSaverincompatible: shallow savers keep only the latest checkpoint — no ancestor chain to walk. Should raise at compile time (follow-up).DeltaChannelfield issues its own_get_channel_writes_historycall today. A single walk collecting all sentinel channels would reduce roundtrips proportionally._get_channel_writes_historyto supportDeltaChannel. The base-class fallback works but is unoptimized.