feat(langgraph): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting#7634
Merged
Sydney Runkle (sydney-runkle) merged 11 commits intoApr 28, 2026
Conversation
… depth
Restores DeltaChannel as a standalone class in channels/delta.py and adds
a snapshot_frequency parameter that writes a full snapshot blob every N writes,
bounding the ancestor replay walk depth while preserving O(N) storage for
large N.
Key design decisions:
- Write-count based (not step-based): snapshot fires every N writes to the
channel, tracked via _write_count incremented in both update() and
replay_writes(). This ensures the snapshot always coincides with an actual
channel write (i.e., a new_versions entry in put()), so it is always stored.
- Snapshot blob format: {"__delta_v__": value, "__delta_wc__": n} embeds the
write count so from_checkpoint() can restore it across invocations, keeping
the cadence correct without any external state.
- _checkpoint.py simplified: DeltaChannel.checkpoint() now returns the right
thing (sentinel or snapshot dict) so create_checkpoint needs no special logic.
- _needs_replay updated: triggers on DELTA_SENTINEL / MISSING; snapshot dicts
and plain values (migration) resolve directly via from_checkpoint().
Benchmark shows correct tradeoffs across frequencies (500 turns):
freq=1 → 296 MB storage, ~7ms reads
freq=5 → 60 MB storage, ~4ms reads
freq=10 → 30 MB storage, ~4ms reads
freq=50 → 6.5 MB storage, ~3ms reads
freq=inf→ 290 KB storage, ~114ms reads
Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Remove unused AggregateChannel compat wrapper from test files; switch all DeltaChannel test aliases to import from channels.delta directly - Drop dict-reducer tests (tested AggregateChannel type-inference, not relevant to this DeltaChannel-only branch) - Add value: Value | Any annotation to AggregateChannel.__slots__ (mypy) - Add isinstance asserts for DeltaChannel before replay_writes calls - Remove now-unused _math_compat import and clean up PostgresSaver import Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
d875eb2 to
f06eb0f
Compare
Replaces all `channels._delta` import paths with the new public `channels.delta` module added in this branch. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting
DeltaChannel snapshot_frequency — bounded read depth with write-count snapshottingDeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting
The dict-reducer tests were incorrectly removed — they test valid DeltaChannel behavior (dict type inference via Annotated) that was already supported by the base branch's _is_field_channel logic in state.py. The only thing needed was fixing the module name. Also updates all remaining channels._delta imports to channels.delta across state.py and test files. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Restores Postgres checkpointer support to both benchmark sections. Uses local Postgres at port 5441. Each run gets a fresh table slice via DELETE before and after to avoid cross-contamination. Key Postgres results at 500 turns: freq=1 → 8.3ms reads (full snapshot every write) freq=5 → 4.8ms reads (bounded replay, fewer large blobs to fetch) freq=10 → 5.3ms reads freq=50 → 6.7ms reads freq=inf → 123.7ms reads (full ancestry walk) Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…t ext type Replaces the write-count (_write_count) snapshot mechanism with a clean step-based approach: pregel's create_checkpoint fires snapshots every N pregel steps regardless of whether the channel was written (eager). Key changes: - snapshot_frequency=None (default) for pure delta; int N for snapshot every N steps - DeltaChannel.checkpoint() always returns DELTA_SENTINEL; snapshot logic lives in create_checkpoint which has the step number - create_checkpoint bumps channel version via get_next_version when the channel wasn't written at a snapshot step (eager: always stores the blob) - _DeltaSnapshot NamedTuple registered as EXT_DELTA_SNAPSHOT (code 7) in the msgpack serde — no dict key collision, type tag does the dispatch - InMemorySaver and PostgresSaver _get_channel_writes_history updated: _DeltaSnapshot blobs collect pending_writes before terminating (they encode the NEXT step's transition, not subsumed by the snapshot unlike pre-delta migration blobs) Tests confirm: - Snapshots fire at every N steps even when channel has no write that step - Correct accumulated state after reconstruction from snapshot + replay Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…annel reconstruction InMemorySaver.prune now walks the parent chain from the latest checkpoint and keeps all ancestors whose writes are still needed for DeltaChannel reconstruction — i.e. ancestors with DELTA_SENTINEL blobs where no _DeltaSnapshot has been written yet in the kept ancestry. Once a _DeltaSnapshot blob exists for all sentinel channels (provided by snapshot_frequency), the walk terminates and older checkpoints plus their writes and blobs are safely deleted. With snapshot_frequency=None (pure delta), all intermediate checkpoints are retained since the full write history is required. Setting snapshot_frequency=N bounds the kept ancestry to N steps. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…as DELTA_SENTINEL In async (default) durability mode, put_writes is fire-and-forget. If checkpoint_writes fails but put succeeds, the sentinel blob has no backing writes — reads silently reconstruct empty/wrong state. Fix: track futures returned by put_writes submissions in _pending_write_futs. Before committing a checkpoint that contains any DELTA_SENTINEL blob, call .result() on all pending write futures, blocking until they complete. This ensures checkpoint_writes are durable before the sentinel blob is committed. The flush only triggers when DELTA_SENTINEL is present, so graphs without DeltaChannel channels are unaffected. Snapshot steps (_DeltaSnapshot blobs) are self-contained and do not need the flush. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…aSnapshot correctly The base saver's _get_channel_writes_history walk terminated at any non-DELTA_SENTINEL blob without collecting that ancestor's pending_writes. This was correct for pre-delta migration blobs (which subsume their own writes), but wrong for _DeltaSnapshot blobs: the snapshot captures state AT the ancestor, while pending_writes encode the NEXT step's transition and must be collected before terminating. Applies the same fix as was already applied to InMemorySaver and PostgresSaver: when the ancestor blob is a _DeltaSnapshot, collect its pending_writes first, then terminate with the snapshot as seed. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
9abee46
into
delta-channel-writes-based
62 checks passed
Sydney Runkle (sydney-runkle)
added a commit
that referenced
this pull request
Apr 30, 2026
…epth with write-count snapshotting (#7634) ## Summary Builds on #7586. Adds `snapshot_frequency: int | None` to `DeltaChannel`, letting users trade storage for bounded read depth. Also promotes `channels/_delta.py` from private to public (`channels/delta.py`). ### How it works Every Nth **pregel step**, `create_checkpoint` writes a `_DeltaSnapshot` blob instead of `DELTA_SENTINEL`. The ancestor walk in `_get_channel_writes_history` terminates at the snapshot rather than walking the full chain, bounding replay to at most N steps. Snapshots are **eager**: fired even on steps where the channel had no write (via a `get_next_version` version bump), so the depth bound holds unconditionally — no risk of the cadence drifting if a channel happens to be silent at a snapshot step. ### Storage formula | Mode | Blob storage | Read depth | |------|-------------|------------| | `snapshot_frequency=None` (pure delta) | O(N) — sentinels only | O(N) steps | | `snapshot_frequency=K` | O(N²/K) — periodic snapshots of growing size | O(K) steps | | add_messages / BinOp | O(N²) — full blob every step | O(1) | At N turns with ~400 char/msg messages, total snapshot storage ≈ N²/(2K) × avg_msg_size, since each snapshot blob grows linearly with accumulated messages. ### Key design decisions - **Step-based**: `snapshot_frequency=K` means "snapshot every K pregel steps." `create_checkpoint` has the step number; the channel itself doesn't need to track writes. - **Eager**: version-bumped via `get_next_version` even on non-write steps so `put()` always stores the blob. - **`_DeltaSnapshot` NamedTuple + msgpack ext type** (`EXT_DELTA_SNAPSHOT = 7`): serde type tag dispatches in `from_checkpoint` — no dict key inspection, no collision risk. - **`from_checkpoint` semantics**: `_DeltaSnapshot` → restore value directly (no replay needed); `DELTA_SENTINEL` / `MISSING` → replay from ancestor writes; plain value → pre-migration BinOp blob. - **InMemorySaver and PostgresSaver updated**: `_get_channel_writes_history` collects the snapshot ancestor's pending_writes before terminating (they encode the *next* step's transition, unlike pre-delta migration blobs which subsume their own writes). - **`snapshot_frequency=None`** is the pure-delta default (replaces `math.inf`). ### Benchmark results (InMemory, ~400 char/msg) **Storage** | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 5.9 MB | 1.2 MB | 601.3 KB | 119.8 KB | 29.5 KB | | 100 | ~20K tok | 23.7 MB | 4.8 MB | 2.4 MB | 475.8 KB | 58.4 KB | | 200 | ~40K tok | 94.6 MB | 19.0 MB | 9.5 MB | 1.9 MB | 116.4 KB | | 500 | ~100K tok | 591.5 MB | 118.4 MB | 59.2 MB | 11.8 MB | 290.3 KB | **Read latency** (avg of 5 `get_state` calls) | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 0.4ms | 0.4ms | 0.7ms | 0.9ms | 1.8ms | | 100 | ~20K tok | 0.7ms | 0.9ms | 1.0ms | 1.7ms | 5.7ms | | 200 | ~40K tok | 1.5ms | 1.7ms | 4.5ms | 3.7ms | 20.1ms | | 500 | ~100K tok | 3.6ms | 4.2ms | 4.4ms | 9.0ms | 110.3ms | **Per-invoke write latency** | turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf | |------:|----:|-------:|-------:|--------:|--------:|---------:| | 50 | ~10K tok | 1.5ms | 1.1ms | 1.1ms | 1.3ms | 1.7ms | | 100 | ~20K tok | 2.5ms | 1.6ms | 1.5ms | 1.7ms | 3.3ms | | 200 | ~40K tok | 3.4ms | 2.3ms | 2.2ms | 2.5ms | 8.3ms | | 500 | ~100K tok | 6.2ms | 4.2ms | 3.6ms | 4.1ms | 39.2ms | ## Test plan - [x] `make format` / `make lint` clean across `langgraph`, `checkpoint`, `checkpoint-postgres` - [x] `tests/test_channels.py` — 37 passing including step-based and eager-snapshot tests - [x] `tests/test_delta_channel_migration.py` — all passing - [x] Full suite: 1387 passing, 6 pre-existing failures unrelated to this branch --------- Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Builds on #7586. Adds
snapshot_frequency: int | NonetoDeltaChannelto bound ancestor walk depth, migrates both delta sentinel types to msgpack ext codes, fixes correctness gaps in_get_channel_writes_historyacross all savers, fixes prune to preserve writes needed for reconstruction, and fixes async durability ordering.Changes
channels/delta.pysnapshot_frequency=N: pregel writes a_DeltaSnapshotblob every N steps (eager — fires even if channel had no write that step, viaget_next_versionversion bump). Reads walk at most N ancestors before hitting the snapshot.snapshot_frequency=None(default): pure delta, unchanged behaviour.serde/types.py+serde/jsonplus.py_DeltaSnapshot(value)registered asEXT_DELTA_SNAPSHOT = 7in the msgpack ext hook.DELTA_SENTINELnow also goes through the msgpack ext hook asEXT_DELTA_SENTINEL = 8, replacing the one-off"delta"type tag indumps_typed/loads_typed. Both delta types are now handled uniformly via the ext system.pregel/_checkpoint.pycreate_checkpointhandles snapshot logic: forces a version bump for DeltaChannel channels at snapshot steps so the blob is always stored byput()._needs_replay/channels_from_checkpointupdated forDeltaChannel.checkpoint/memory/__init__.py—InMemorySaver_get_channel_writes_history: correctly handles_DeltaSnapshotblobs — collects the ancestor's pending_writes before terminating (they encode the next step's transition, unlike pre-delta blobs which subsume their own writes).prune: walks the parent chain from the latest checkpoint; only deletes ancestors whose writes are covered by a_DeltaSnapshotin the kept ancestry. Pure delta threads (snapshot_frequency=None) retain all ancestors.checkpoint/base/__init__.py— base saver fallback_DeltaSnapshot-aware fix applied to both sync and async_get_channel_writes_history.checkpoint-postgres/base.py— Postgres saver_DeltaSnapshot-aware fix applied to_build_delta_channel_writes_history.pregel/_loop.py— async durability fixput_writessubmissions in_pending_write_futs. Before committing a checkpoint that contains anyDELTA_SENTINELblob, flushes all pending write futures synchronously. Eliminates the window where a sentinel blob could be committed before its backing writes in async (default) durability mode.Benchmark results (InMemory, ~400 char/msg)
Storage
Read latency (avg of 5
get_statecalls)Per-invoke write latency
Test plan
make format/make lintclean acrosslanggraph,checkpoint,checkpoint-postgrestests/test_channels.py— 37 passing including step-based and eager-snapshot teststests/test_delta_channel_migration.py— all passingtests/test_pregel.py— delta channel end-to-end, time travel, remove/update-by-id, write-flush orderingcheckpoint/tests/test_memory.py— 22 passing including prune DeltaChannel tests