Skip to content

fix(streams): Stage 3 — source-field crash + circuit-breaker bugs (M2/M3/M4)#1459

Merged
jonastemplestein merged 5 commits into
mainfrom
streams-review-stage3-fixes
Jun 10, 2026
Merged

fix(streams): Stage 3 — source-field crash + circuit-breaker bugs (M2/M3/M4)#1459
jonastemplestein merged 5 commits into
mainfrom
streams-review-stage3-fixes

Conversation

@jonastemplestein

@jonastemplestein jonastemplestein commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

What

Stage 3 of the packages/streams review. Clears the last three regression ratchets (T4, T5, T6it). The entire streams suite is now green with no it.fails left.

Stacked on #1458 (Stage 1) — base streams-review-stage1-fixes; retarget up the chain as the lower PRs merge.

M2 — a source field no longer crashes appendBatch

The DO append accepts source, but the runner-side parsers (getEventSchema / getEventInputSchema) were strict objects that omitted it, so the inline core reduce (consumes: ["*"]) threw Unrecognized key: "source" and rejected the whole batch. Extracted a shared StreamEventSourceSchema (+ streamEventIdempotencyKeySchema) in event.ts and used them in both parsers so input and reduce schemas agree. Kept source (the migration note shows OS wants it). idempotencyKey is now trim().min(1) on input too, closing the same crash class for blank keys. Covered by T4.

M3 — circuit-breaker no longer drains on a backwards clock

spendCircuitBreakerToken now clamps elapsed time with Math.max(0, …). A regressed createdAt (DO migration / clock skew) previously subtracted refill — at the default rate a 1 s regression drained ~100k tokens and instantly tripped the breaker. Covered by T5.

M4 — circuit-breaker now pauses a live flood that began tripping during replay

The trip was edge-triggered on previousState, but processEvent isn't called for replay events (≤ the side-effect anchor), so when the not-tripped→tripped transition happened during replay, every live event afterward saw an already-tripped previousState and the breaker stayed silent. Now level-triggered: fires whenever the bucket is in deficit on a live event. Idempotency-keyed per offset and self-limiting (once the stream pauses, ordinary appends are rejected). Covered by T6.

Verification

node pool:    63 passed (63)   — no it.fails remaining
workers pool:  8 passed (8)
typecheck: clean   lint: clean

🤖 Generated with Claude Code


Note

Medium Risk
Touches append/reduce validation on the hot path and changes when streams auto-pause under rate limits; behavior is pinned by T4–T6 but mis-tuned limits could still pause production streams.

Overview
Stage 3 closes three streams review findings (M2/M3/M4) and turns regression ratchets T4, T5, T6 from it.fails into passing it tests.

Event schema alignment (M2): Shared StreamEventSourceSchema and streamEventIdempotencyKeySchema in event.ts are wired into base input, getEventInputSchema, and getEventSchema so append and inline reduce agree. Events with source no longer fail core reduce with Unrecognized key; idempotencyKey is trim().min(1) at append time so blank keys cannot commit and then fail in reduce.

Circuit breaker (M3/M4): spendCircuitBreakerToken clamps refill elapsed time with Math.max(0, …) so regressed createdAt (migration/clock skew) cannot subtract tokens and false-trip. Trip handling is level-triggered: processEvent appends stream/paused whenever the bucket is in deficit on a live event, without requiring a fresh not-tripped→tripped edge—so overload that tripped during replay still pauses the live flood. Pause appends stay idempotency-keyed per offset.

Reviewed by Cursor Bugbot for commit 625f875. Bugbot is set up for automated code reviews on this repo. Configure here.

Environment Config Lease

No active environment config lease.

jonastemplestein and others added 2 commits June 10, 2026 16:07
…t ordering, RPC surface

Fixes the three CRITICAL findings from the packages/streams review
(tasks/streams-review-fixes.md). Flips the Stage 0 ratchets T1/T1b/T2/T3 to
passing.

C1 — a failed batch no longer drops events. The hosted-processor host now
re-handshakes from the durable checkpoint on ingest failure so the stream
replays the batch. Because the pump is fire-and-forget, later batches keep
arriving during recovery; ingesting one would advance the checkpoint past the
gap, so each subscription is tagged with a generation and ingest runs through a
per-processor serial chain that re-checks the generation between batches —
batches on the superseded connection are dropped and the replay is the single
source of truth. A persistently-failing (poison) batch, after
MAX_CONSECUTIVE_INGEST_FAILURES, appends a stream/error-occurred event and
disconnects rather than hot-looping; recovery is then up to the
subscriber/processor. (Browser-mirror consumer has the same bug — tracked for
Stage 4.)

C2 — StreamProcessor.#ingest now persists the snapshot before advancing
instead of silently advancing the in-memory checkpoint and no-oping the retry.
Inlined the single-use #saveSnapshot.

C3 — makeRpcTargetClass gained an `include` allowlist; StreamRpcTarget and
PublicStreamRpcTarget now allowlist the explicit StreamRpc surface instead of
denylisting two methods. The protected readCoreProcessorState /
writeCoreProcessorState are no longer proxied onto the unauthenticated public
target (where writeCoreProcessorState could inject an attacker-chosen
subscription callable).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…review)

Cursor Bugbot: the per-processor ingest chain was extended with `.then(...)`
but had no trailing `.catch`, so if recoverFromIngestFailure threw (a failed
re-handshake, or the poison-path stream/error-occurred append failing) the chain
stayed rejected and every later batch's `.then` handler was skipped — live
delivery would wedge permanently. Mirrors StreamProcessor.ingest, which already
`.catch(() => undefined)`s its own #processing chain.

Added a trailing `.catch` that logs and resolves; a future re-handshake
(stream-side reconcile) is the way back. New regression test T1c proves the
chain keeps processing after a re-handshake when the first recovery threw
(verified failing without the catch).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@jonastemplestein jonastemplestein force-pushed the streams-review-stage1-fixes branch from d89a4a9 to 98013a0 Compare June 10, 2026 15:07
@jonastemplestein jonastemplestein force-pushed the streams-review-stage3-fixes branch from a470958 to 6886162 Compare June 10, 2026 15:08
autofix-ci Bot and others added 3 commits June 10, 2026 15:08
Cursor Bugbot (medium): requestStreamSubscription tore down the live handle and
opened a new connection via openSubscription but, unlike recoverFromIngestFailure,
never incremented entry.generation. Batches still queued on the prior connection
could pass the generation gate and ingest after the replacement handshake,
undermining the superseded-connection drop C1 depends on. Now the handshake
invalidates the previous connection like the recovery path does, so the new
connection's replay from the checkpoint is authoritative.

(No dedicated test: the failure mode is a gap that only manifests under a precise
stale-batch/replay offset interleaving that the in-memory fake can't deterministically
reproduce; the generation gate itself is covered by T1, and this change just makes
the two teardown paths consistent.)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…/M3/M4)

Clears the last regression ratchets (T4/T5/T6 -> it). The whole streams suite
is now green with no it.fails.

M2 — an event carrying a `source` field no longer crashes appendBatch. The DO
append accepts `source`, but the runner-side parsers (getEventSchema /
getEventInputSchema) were strict objects that omitted it, so the inline core
reduce threw "Unrecognized key: source". Extracted a shared
StreamEventSourceSchema (and streamEventIdempotencyKeySchema) in event.ts and
used them in both parsers so input and reduce schemas agree. idempotencyKey is
now trim().min(1) on input too, closing the same crash class for blank keys.

M3 — circuit-breaker token bucket no longer drains on a backwards clock.
spendCircuitBreakerToken clamps elapsed time with Math.max(0, ...); a regressed
createdAt previously subtracted refill and tripped the breaker for no reason.

M4 — circuit-breaker now pauses on a live flood that began tripping during
replay. The trip is level-triggered (fires whenever the bucket is in deficit on
a live event) instead of edge-triggered on previousState, which was skipped for
the common case where the trip transition happened at/below the side-effect
anchor. Idempotency-keyed per offset and self-limiting once the stream pauses.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@jonastemplestein jonastemplestein force-pushed the streams-review-stage3-fixes branch from 6886162 to 625f875 Compare June 10, 2026 15:30
Base automatically changed from streams-review-stage1-fixes to main June 10, 2026 15:32
@jonastemplestein jonastemplestein merged commit 30466e9 into main Jun 10, 2026
8 of 9 checks passed
@jonastemplestein jonastemplestein deleted the streams-review-stage3-fixes branch June 10, 2026 15:33
jonastemplestein added a commit that referenced this pull request Jun 10, 2026
#1479)

## Streams review — Stages 4–7 (browser robustness, perf, dead code,
docs)

The final batch of the `packages/streams` adversarial review
(`tasks/streams-review-fixes.md`), bundled into one PR per request.
Stages 0/1/3 already shipped (#1455, #1458, #1459); **Stage 2 (lazy
init) was abandoned** — #1460's subscriber-presence model appends a
`subscriber-connected` fact on every `subscribe()`, which fundamentally
conflicts with "don't initialize storage until the first `append()`".
`created`/`woken` stay eager.

All findings were re-verified against current `main` after #1460/#1457
moved the codebase; items those PRs already fixed or made obsolete are
called out below.

### Stage 4 — browser-runtime correctness
- **C1-browser** — the server delivery pump is fire-and-forget for
inbound subscribers and never reports a failed `ingest`, so a browser
mirror that fails to apply a batch silently desyncs forever. The browser
now self-heals: `ingestWithSelfHeal` resubscribes from the persisted
checkpoint with bounded exponential backoff on ingest failure.
- **B1** — connection-epoch guard so a stale connection's late status
callback can't clobber the live connection.
- **B2** — `appendBatch`/`runtimeState` await connection readiness
(`whenStreamReady`) instead of throwing "disposed" during a transient
reconnect; only throw when actually disposed.
- **B3** — the `ROLLBACK` in `stream-db.worker.ts` `batch()` is now in
its own try/catch so it can't mask the original error (which
`withBusyRetry`/`isBusyError` need to see).
- **B4** — reconcile guards on stream incarnation (server `createdAt` +
a `mirror_meta` table): rebuild the mirror on incarnation change rather
than trusting the offset comparison after a `reset()`.
- **B5** — `AbortSignal` on the Web Lock request (released on disposal)
+ the request rejection is surfaced instead of `void`-swallowed.
- **B6** — query lifecycle: arm GC at query creation (not only on
unsubscribe), skip listenerless queries in `#onChange`, and
equality-check before notifying to avoid spurious `useSyncExternalStore`
churn.

### Stage 5 — performance
- **P1** — `browser-event-feed` O(n²) write amplification: coalesce to
one op per `local_index` (was a cumulative op per event, each
re-serializing the whole accumulated array).
- **P2** — bound group rows with `MAX_GROUP_EVENTS = 200` so one
dominant event type can't grow a single blob unboundedly.
- **P3** — drop the per-event full-state `stateSchema.parse` in core
`reduce`; state is validated only at the KV/recovery trust boundary now.
- **P4** — already fixed by #1460 (subscribe override forwards
`eventTypes`; filtering is server-side). No change.

### Stage 6 — dead code / elegance (~247 lines deleted)
- **E1** dead exports in `shared/stream-processors.ts`; **E2**
`waitForOpen`; **E3** unreachable circuit-breaker `paused` guard; **E5**
dead `waitForEvent`/`messageInbox.error` machinery in `subscription.ts`;
**E6** redundant circuit-breaker `consumes` entries.
- **E4 obsolete** (#1460 removed `processor-registered`; the shared
`circuit-breaker-types` is a live, separately-imported hierarchy, not
dead code). **E7 declined** — post-#1460 the subscribe override
genuinely needs to retain the client callback / wire `onRpcBroken`, so
folding it into the generic `makeRpcTargetClass` isn't worth it.

### Stage 7 — docs
- **D1** `design.md`: status banner, fixed 1-based offsets + core-event
examples, the callable subscriber spec, SQLite/512KB-chunking storage,
live-tail `replayAfterOffset` default, OPFS mirror; superseded banners
on the never-shipped `implementProcessor`/`connectStream` sections.
- **D2** `README.md`: real browser export
(`withStreamConnectionFromBrowser`), sync `Disposable`, `?path=` route
map; new "Append & subscription semantics" section (D5).
- **D3** ADR 0001 superseded with the shipped per-`(namespace, path,
slug)` singleton model.
- **D4** remaining comment drift (`beforeAppend`→`validateAppend`,
`afterAppendBatch`→`processEventBatch`).

### Testing
`pnpm typecheck`, `pnpm lint`, node (79) + workers (15) suites, and
example-app typecheck all green. Example-app Playwright e2e run locally
against a clean `vite dev` (real Stream DO via Miniflare): **26
passed**. `origin/main` merged in (no `packages/streams` overlap; the
new project-config stream-processor consumers don't touch any deleted
exports).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Touches browser reconnect, mirror discard, and ingest self-heal paths
that affect live viewers and local SQLite mirrors; server stream DO
behavior is mostly unchanged aside from core reduce hot-path
optimization.
> 
> **Overview**
> Closes out **Stages 4–7** of the `packages/streams` adversarial
review: browser mirror reliability, feed/core hot-path perf, dead-code
removal, and doc alignment. Stage 2 (lazy stream init) stays
**abandoned** and is only noted in `tasks/streams-review-fixes.md`.
> 
> **Browser runtime** (`stream-browser-store.ts` and friends) is
reworked so the OPFS mirror can recover instead of silently drifting.
Inbound delivery is fire-and-forget, so failed `ingest` now triggers
**`ingestWithSelfHeal`** (resubscribe from the last checkpoint with
capped exponential backoff). Reconnects go through one
**`scheduleReconnect`** path with a **connection epoch** so stale
WebSocket callbacks cannot tear down the replacement connection.
**`appendBatch` / `runtimeState`** use **`callWhenReady`** /
**`whenStreamReady`** during transient reconnects instead of throwing
“disposed”. Mirror trust uses server **`createdAt`** as incarnation
identity via new **`mirror_meta`** helpers in
**`stream-browser-db.ts`**. Web Lock **`release()`** aborts pending lock
requests; the SQLite worker preserves the original batch error if
`ROLLBACK` fails; reactive queries GC earlier, skip unobserved
refreshes, and avoid notify when snapshots are unchanged.
> 
> **Performance:** `browser-event-feed` **`planFeedOps`** coalesces one
SQL op per group row and caps groups at **`MAX_GROUP_EVENTS` (200)**.
Core **`reduce`** no longer exit-parses the full `stateSchema` on every
appended event.
> 
> **Cleanup:** ~247 lines removed (unused `stream-processors` exports,
`waitForOpen`, dead subscription `waitForEvent` machinery, trimmed
circuit-breaker `consumes` and an unreachable guard). **Docs:** `README`
(real browser API, `?path=` routes, append/subscription semantics),
`design.md` and ADR 0001 marked superseded where they diverge from
shipped code.
> 
> <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit
f52f305. Bugbot is set up for automated
code reviews on this repo. Configure
[here](https://www.cursor.com/dashboard/bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

<!-- CLOUDFLARE_PREVIEW -->
## Environment Config Lease
<!-- CLOUDFLARE_PREVIEW_STATE -->
<!--
{
  "apps": {
    "os": {
      "appDisplayName": "OS",
      "appSlug": "os",
      "status": "deployed",
      "updatedAt": "2026-06-10T22:02:58.860Z",
      "headSha": "f52f305a9f1ab43c9749f80e30bcd96705cbce59",
      "message": null,
      "publicUrl": "https://os.iterate-preview-3.com",
"runUrl": "https://github.com/iterate/iterate/actions/runs/27309066809",
      "shortSha": "f52f305"
    }
  },
  "environmentConfigLease": {
    "dopplerConfig": "preview_3",
    "leasedUntil": 1781132382486,
    "leaseId": "52648479-bd91-469d-91e5-a3338534feae",
    "slug": "preview-3",
    "type": "environment-config-lease"
  }
}
-->
<!-- /CLOUDFLARE_PREVIEW_STATE -->
Lease: `preview-3`
Doppler config: `preview_3`
Type: `environment-config-lease`
Leased until: 2026-06-10T22:59:42.486Z

### OS
Status: deployed
Commit: `f52f305`
Preview: https://os.iterate-preview-3.com
[Workflow
run](https://github.com/iterate/iterate/actions/runs/27309066809)
Updated: 2026-06-10T22:02:58.860Z
<!-- /CLOUDFLARE_PREVIEW -->

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
jonastemplestein added a commit that referenced this pull request Jun 11, 2026
The packages/streams adversarial review is fully shipped — Stages 0/1/3
(#1455/#1458/#1459), Stages 4–7 (#1479), Stage 2 deliberately abandoned.
This deletes the now-stale `tasks/streams-review-fixes.md` and removes
the dangling pointers to it from the regression-test comments (the
finding ids and the PRs in git history are the durable record).

Doc/comment-only; no code changes.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Low Risk**
> Documentation and comment-only changes; no runtime or test behavior
changes.
> 
> **Overview**
> Removes the completed **`tasks/streams-review-fixes.md`** tracking doc
now that the June 2026 `packages/streams` review work is shipped (Stages
0/1/3 via #1455/#1458/#1459, Stages 4–7 via #1479; Stage 2 abandoned).
> 
> Regression test file headers are updated so they no longer link to
that task file. Comments now cite **review finding ids (C*/M*)** and the
**merged PRs** as the durable record instead.
> 
> <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit
7c33b86. Bugbot is set up for automated
code reviews on this repo. Configure
[here](https://www.cursor.com/dashboard/bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

<!-- CLOUDFLARE_PREVIEW -->
## Environment Config Lease
<!-- CLOUDFLARE_PREVIEW_STATE -->
<!--
{
  "apps": {
    "os": {
      "appDisplayName": "OS",
      "appSlug": "os",
      "status": "deployed",
      "updatedAt": "2026-06-11T09:38:48.453Z",
      "headSha": "7c33b86fa2d6f11b18ad8e99bc42c78940c8a623",
      "message": null,
      "publicUrl": "https://os.iterate-preview-2.com",
"runUrl": "https://github.com/iterate/iterate/actions/runs/27337613329",
      "shortSha": "7c33b86"
    }
  },
  "environmentConfigLease": {
    "dopplerConfig": "preview_2",
    "leasedUntil": 1781174051088,
    "leaseId": "82ae23a4-0588-41db-bf69-772ac5bb0d87",
    "slug": "preview-2",
    "type": "environment-config-lease"
  }
}
-->
<!-- /CLOUDFLARE_PREVIEW_STATE -->
Lease: `preview-2`
Doppler config: `preview_2`
Type: `environment-config-lease`
Leased until: 2026-06-11T10:34:11.088Z

### OS
Status: deployed
Commit: `7c33b86`
Preview: https://os.iterate-preview-2.com
[Workflow
run](https://github.com/iterate/iterate/actions/runs/27337613329)
Updated: 2026-06-11T09:38:48.453Z
<!-- /CLOUDFLARE_PREVIEW -->

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant