Skip to content

feat(langgraph): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting#7634

Merged
Sydney Runkle (sydney-runkle) merged 11 commits into
delta-channel-writes-basedfrom
sr/snapshot-freq-bench
Apr 28, 2026
Merged

feat(langgraph): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting#7634
Sydney Runkle (sydney-runkle) merged 11 commits into
delta-channel-writes-basedfrom
sr/snapshot-freq-bench

Conversation

@sydney-runkle

@sydney-runkle Sydney Runkle (sydney-runkle) commented Apr 27, 2026

Copy link
Copy Markdown
Collaborator

Summary

Builds on #7586. Adds snapshot_frequency: int | None to DeltaChannel to bound ancestor walk depth, migrates both delta sentinel types to msgpack ext codes, fixes correctness gaps in _get_channel_writes_history across all savers, fixes prune to preserve writes needed for reconstruction, and fixes async durability ordering.

Changes

channels/delta.py

  • snapshot_frequency=N: pregel writes a _DeltaSnapshot blob every N steps (eager — fires even if channel had no write that step, via get_next_version version bump). Reads walk at most N ancestors before hitting the snapshot.
  • snapshot_frequency=None (default): pure delta, unchanged behaviour.

serde/types.py + serde/jsonplus.py

  • _DeltaSnapshot(value) registered as EXT_DELTA_SNAPSHOT = 7 in the msgpack ext hook.
  • DELTA_SENTINEL now also goes through the msgpack ext hook as EXT_DELTA_SENTINEL = 8, replacing the one-off "delta" type tag in dumps_typed/loads_typed. Both delta types are now handled uniformly via the ext system.

pregel/_checkpoint.py

  • create_checkpoint handles snapshot logic: forces a version bump for DeltaChannel channels at snapshot steps so the blob is always stored by put().
  • _needs_replay / channels_from_checkpoint updated for DeltaChannel.

checkpoint/memory/__init__.pyInMemorySaver

  • _get_channel_writes_history: correctly handles _DeltaSnapshot blobs — collects the ancestor's pending_writes before terminating (they encode the next step's transition, unlike pre-delta blobs which subsume their own writes).
  • prune: walks the parent chain from the latest checkpoint; only deletes ancestors whose writes are covered by a _DeltaSnapshot in the kept ancestry. Pure delta threads (snapshot_frequency=None) retain all ancestors.

checkpoint/base/__init__.py — base saver fallback

  • Same _DeltaSnapshot-aware fix applied to both sync and async _get_channel_writes_history.

checkpoint-postgres/base.py — Postgres saver

  • Same _DeltaSnapshot-aware fix applied to _build_delta_channel_writes_history.

pregel/_loop.py — async durability fix

  • Tracks futures from put_writes submissions in _pending_write_futs. Before committing a checkpoint that contains any DELTA_SENTINEL blob, flushes all pending write futures synchronously. Eliminates the window where a sentinel blob could be committed before its backing writes in async (default) durability mode.

Benchmark results (InMemory, ~400 char/msg)

Storage

turns ctx freq=1 freq=5 freq=10 freq=50 freq=inf
50 ~10K tok 5.9 MB 1.2 MB 601.3 KB 119.8 KB 29.5 KB
100 ~20K tok 23.7 MB 4.8 MB 2.4 MB 475.8 KB 58.4 KB
200 ~40K tok 94.6 MB 19.0 MB 9.5 MB 1.9 MB 116.4 KB
500 ~100K tok 591.5 MB 118.4 MB 59.2 MB 11.8 MB 290.3 KB

Read latency (avg of 5 get_state calls)

turns ctx freq=1 freq=5 freq=10 freq=50 freq=inf
50 ~10K tok 0.4ms 0.4ms 0.4ms 0.6ms 1.0ms
100 ~20K tok 0.8ms 0.7ms 0.8ms 1.1ms 2.6ms
200 ~40K tok 1.5ms 1.7ms 3.8ms 2.3ms 8.2ms
500 ~100K tok 3.7ms 3.8ms 6.5ms 5.7ms 49.0ms

Per-invoke write latency

turns ctx freq=1 freq=5 freq=10 freq=50 freq=inf
50 ~10K tok 1.1ms 0.9ms 0.8ms 0.9ms 1.1ms
100 ~20K tok 1.7ms 1.1ms 1.1ms 1.2ms 1.8ms
200 ~40K tok 2.8ms 1.8ms 1.6ms 1.7ms 4.0ms
500 ~100K tok 5.6ms 3.6ms 3.2ms 3.4ms 18.9ms

Test plan

  • make format / make lint clean across langgraph, checkpoint, checkpoint-postgres
  • tests/test_channels.py — 37 passing including step-based and eager-snapshot tests
  • tests/test_delta_channel_migration.py — all passing
  • tests/test_pregel.py — delta channel end-to-end, time travel, remove/update-by-id, write-flush ordering
  • checkpoint/tests/test_memory.py — 22 passing including prune DeltaChannel tests
  • Full suite: 1388 passing, 6 pre-existing failures unrelated to this branch

… depth

Restores DeltaChannel as a standalone class in channels/delta.py and adds
a snapshot_frequency parameter that writes a full snapshot blob every N writes,
bounding the ancestor replay walk depth while preserving O(N) storage for
large N.

Key design decisions:
- Write-count based (not step-based): snapshot fires every N writes to the
  channel, tracked via _write_count incremented in both update() and
  replay_writes(). This ensures the snapshot always coincides with an actual
  channel write (i.e., a new_versions entry in put()), so it is always stored.
- Snapshot blob format: {"__delta_v__": value, "__delta_wc__": n} embeds the
  write count so from_checkpoint() can restore it across invocations, keeping
  the cadence correct without any external state.
- _checkpoint.py simplified: DeltaChannel.checkpoint() now returns the right
  thing (sentinel or snapshot dict) so create_checkpoint needs no special logic.
- _needs_replay updated: triggers on DELTA_SENTINEL / MISSING; snapshot dicts
  and plain values (migration) resolve directly via from_checkpoint().

Benchmark shows correct tradeoffs across frequencies (500 turns):
  freq=1  → 296 MB storage, ~7ms reads
  freq=5  → 60 MB storage,  ~4ms reads
  freq=10 → 30 MB storage,  ~4ms reads
  freq=50 → 6.5 MB storage, ~3ms reads
  freq=inf→ 290 KB storage, ~114ms reads

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
- Remove unused AggregateChannel compat wrapper from test files; switch
  all DeltaChannel test aliases to import from channels.delta directly
- Drop dict-reducer tests (tested AggregateChannel type-inference, not
  relevant to this DeltaChannel-only branch)
- Add value: Value | Any annotation to AggregateChannel.__slots__ (mypy)
- Add isinstance asserts for DeltaChannel before replay_writes calls
- Remove now-unused _math_compat import and clean up PostgresSaver import

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@sydney-runkle Sydney Runkle (sydney-runkle) changed the base branch from sr/even-better-writes-idea to delta-channel-writes-based April 27, 2026 22:24
Replaces all `channels._delta` import paths with the new public
`channels.delta` module added in this branch.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@sydney-runkle Sydney Runkle (sydney-runkle) changed the title feat(channels): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting feat(channels): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting Apr 27, 2026
@sydney-runkle Sydney Runkle (sydney-runkle) changed the title feat(channels): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting feat(langgraph): DeltaChannel snapshot_frequency — bounded read depth with write-count snapshotting Apr 27, 2026
The dict-reducer tests were incorrectly removed — they test valid
DeltaChannel behavior (dict type inference via Annotated) that was
already supported by the base branch's _is_field_channel logic in
state.py. The only thing needed was fixing the module name.

Also updates all remaining channels._delta imports to channels.delta
across state.py and test files.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Restores Postgres checkpointer support to both benchmark sections.
Uses local Postgres at port 5441. Each run gets a fresh table slice
via DELETE before and after to avoid cross-contamination.

Key Postgres results at 500 turns:
  freq=1   → 8.3ms reads  (full snapshot every write)
  freq=5   → 4.8ms reads  (bounded replay, fewer large blobs to fetch)
  freq=10  → 5.3ms reads
  freq=50  → 6.7ms reads
  freq=inf → 123.7ms reads (full ancestry walk)

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…t ext type

Replaces the write-count (_write_count) snapshot mechanism with a clean
step-based approach: pregel's create_checkpoint fires snapshots every N
pregel steps regardless of whether the channel was written (eager).

Key changes:
- snapshot_frequency=None (default) for pure delta; int N for snapshot every N steps
- DeltaChannel.checkpoint() always returns DELTA_SENTINEL; snapshot logic
  lives in create_checkpoint which has the step number
- create_checkpoint bumps channel version via get_next_version when the
  channel wasn't written at a snapshot step (eager: always stores the blob)
- _DeltaSnapshot NamedTuple registered as EXT_DELTA_SNAPSHOT (code 7) in
  the msgpack serde — no dict key collision, type tag does the dispatch
- InMemorySaver and PostgresSaver _get_channel_writes_history updated:
  _DeltaSnapshot blobs collect pending_writes before terminating (they
  encode the NEXT step's transition, not subsumed by the snapshot unlike
  pre-delta migration blobs)

Tests confirm:
- Snapshots fire at every N steps even when channel has no write that step
- Correct accumulated state after reconstruction from snapshot + replay

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…annel reconstruction

InMemorySaver.prune now walks the parent chain from the latest checkpoint
and keeps all ancestors whose writes are still needed for DeltaChannel
reconstruction — i.e. ancestors with DELTA_SENTINEL blobs where no
_DeltaSnapshot has been written yet in the kept ancestry.

Once a _DeltaSnapshot blob exists for all sentinel channels (provided by
snapshot_frequency), the walk terminates and older checkpoints plus their
writes and blobs are safely deleted.

With snapshot_frequency=None (pure delta), all intermediate checkpoints
are retained since the full write history is required. Setting
snapshot_frequency=N bounds the kept ancestry to N steps.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…as DELTA_SENTINEL

In async (default) durability mode, put_writes is fire-and-forget. If
checkpoint_writes fails but put succeeds, the sentinel blob has no
backing writes — reads silently reconstruct empty/wrong state.

Fix: track futures returned by put_writes submissions in
_pending_write_futs. Before committing a checkpoint that contains any
DELTA_SENTINEL blob, call .result() on all pending write futures,
blocking until they complete. This ensures checkpoint_writes are durable
before the sentinel blob is committed.

The flush only triggers when DELTA_SENTINEL is present, so graphs
without DeltaChannel channels are unaffected. Snapshot steps
(_DeltaSnapshot blobs) are self-contained and do not need the flush.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
…aSnapshot correctly

The base saver's _get_channel_writes_history walk terminated at any
non-DELTA_SENTINEL blob without collecting that ancestor's pending_writes.
This was correct for pre-delta migration blobs (which subsume their own
writes), but wrong for _DeltaSnapshot blobs: the snapshot captures state
AT the ancestor, while pending_writes encode the NEXT step's transition
and must be collected before terminating.

Applies the same fix as was already applied to InMemorySaver and
PostgresSaver: when the ancestor blob is a _DeltaSnapshot, collect its
pending_writes first, then terminate with the snapshot as seed.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
@sydney-runkle Sydney Runkle (sydney-runkle) merged commit 9abee46 into delta-channel-writes-based Apr 28, 2026
62 checks passed
@sydney-runkle Sydney Runkle (sydney-runkle) deleted the sr/snapshot-freq-bench branch April 28, 2026 20:42
Sydney Runkle (sydney-runkle) added a commit that referenced this pull request Apr 30, 2026
…epth with write-count snapshotting (#7634)

## Summary

Builds on #7586. Adds `snapshot_frequency: int | None` to
`DeltaChannel`, letting users trade storage for bounded read depth. Also
promotes `channels/_delta.py` from private to public
(`channels/delta.py`).

### How it works

Every Nth **pregel step**, `create_checkpoint` writes a `_DeltaSnapshot`
blob instead of `DELTA_SENTINEL`. The ancestor walk in
`_get_channel_writes_history` terminates at the snapshot rather than
walking the full chain, bounding replay to at most N steps.

Snapshots are **eager**: fired even on steps where the channel had no
write (via a `get_next_version` version bump), so the depth bound holds
unconditionally — no risk of the cadence drifting if a channel happens
to be silent at a snapshot step.

### Storage formula

| Mode | Blob storage | Read depth |
|------|-------------|------------|
| `snapshot_frequency=None` (pure delta) | O(N) — sentinels only | O(N)
steps |
| `snapshot_frequency=K` | O(N²/K) — periodic snapshots of growing size
| O(K) steps |
| add_messages / BinOp | O(N²) — full blob every step | O(1) |

At N turns with ~400 char/msg messages, total snapshot storage ≈ N²/(2K)
× avg_msg_size, since each snapshot blob grows linearly with accumulated
messages.

### Key design decisions

- **Step-based**: `snapshot_frequency=K` means "snapshot every K pregel
steps." `create_checkpoint` has the step number; the channel itself
doesn't need to track writes.
- **Eager**: version-bumped via `get_next_version` even on non-write
steps so `put()` always stores the blob.
- **`_DeltaSnapshot` NamedTuple + msgpack ext type**
(`EXT_DELTA_SNAPSHOT = 7`): serde type tag dispatches in
`from_checkpoint` — no dict key inspection, no collision risk.
- **`from_checkpoint` semantics**: `_DeltaSnapshot` → restore value
directly (no replay needed); `DELTA_SENTINEL` / `MISSING` → replay from
ancestor writes; plain value → pre-migration BinOp blob.
- **InMemorySaver and PostgresSaver updated**:
`_get_channel_writes_history` collects the snapshot ancestor's
pending_writes before terminating (they encode the *next* step's
transition, unlike pre-delta migration blobs which subsume their own
writes).
- **`snapshot_frequency=None`** is the pure-delta default (replaces
`math.inf`).

### Benchmark results (InMemory, ~400 char/msg)

**Storage**

| turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf |
|------:|----:|-------:|-------:|--------:|--------:|---------:|
| 50 | ~10K tok | 5.9 MB | 1.2 MB | 601.3 KB | 119.8 KB | 29.5 KB |
| 100 | ~20K tok | 23.7 MB | 4.8 MB | 2.4 MB | 475.8 KB | 58.4 KB |
| 200 | ~40K tok | 94.6 MB | 19.0 MB | 9.5 MB | 1.9 MB | 116.4 KB |
| 500 | ~100K tok | 591.5 MB | 118.4 MB | 59.2 MB | 11.8 MB | 290.3 KB |

**Read latency** (avg of 5 `get_state` calls)

| turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf |
|------:|----:|-------:|-------:|--------:|--------:|---------:|
| 50 | ~10K tok | 0.4ms | 0.4ms | 0.7ms | 0.9ms | 1.8ms |
| 100 | ~20K tok | 0.7ms | 0.9ms | 1.0ms | 1.7ms | 5.7ms |
| 200 | ~40K tok | 1.5ms | 1.7ms | 4.5ms | 3.7ms | 20.1ms |
| 500 | ~100K tok | 3.6ms | 4.2ms | 4.4ms | 9.0ms | 110.3ms |

**Per-invoke write latency**

| turns | ctx | freq=1 | freq=5 | freq=10 | freq=50 | freq=inf |
|------:|----:|-------:|-------:|--------:|--------:|---------:|
| 50 | ~10K tok | 1.5ms | 1.1ms | 1.1ms | 1.3ms | 1.7ms |
| 100 | ~20K tok | 2.5ms | 1.6ms | 1.5ms | 1.7ms | 3.3ms |
| 200 | ~40K tok | 3.4ms | 2.3ms | 2.2ms | 2.5ms | 8.3ms |
| 500 | ~100K tok | 6.2ms | 4.2ms | 3.6ms | 4.1ms | 39.2ms |

## Test plan

- [x] `make format` / `make lint` clean across `langgraph`,
`checkpoint`, `checkpoint-postgres`
- [x] `tests/test_channels.py` — 37 passing including step-based and
eager-snapshot tests
- [x] `tests/test_delta_channel_migration.py` — all passing
- [x] Full suite: 1387 passing, 6 pre-existing failures unrelated to
this branch

---------

Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant