fix(streams): Stage 1 — failed-batch desync, checkpoint ordering, RPC surface (C1/C2/C3)#1458
Conversation
…review) Cursor Bugbot: the per-processor ingest chain was extended with `.then(...)` but had no trailing `.catch`, so if recoverFromIngestFailure threw (a failed re-handshake, or the poison-path stream/error-occurred append failing) the chain stayed rejected and every later batch's `.then` handler was skipped — live delivery would wedge permanently. Mirrors StreamProcessor.ingest, which already `.catch(() => undefined)`s its own #processing chain. Added a trailing `.catch` that logs and resolves; a future re-handshake (stream-side reconcile) is the way back. New regression test T1c proves the chain keeps processing after a re-handshake when the first recovery threw (verified failing without the catch). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Addressed the Bugbot finding in d89a4a9: the ingest chain now has a trailing |
…t ordering, RPC surface Fixes the three CRITICAL findings from the packages/streams review (tasks/streams-review-fixes.md). Flips the Stage 0 ratchets T1/T1b/T2/T3 to passing. C1 — a failed batch no longer drops events. The hosted-processor host now re-handshakes from the durable checkpoint on ingest failure so the stream replays the batch. Because the pump is fire-and-forget, later batches keep arriving during recovery; ingesting one would advance the checkpoint past the gap, so each subscription is tagged with a generation and ingest runs through a per-processor serial chain that re-checks the generation between batches — batches on the superseded connection are dropped and the replay is the single source of truth. A persistently-failing (poison) batch, after MAX_CONSECUTIVE_INGEST_FAILURES, appends a stream/error-occurred event and disconnects rather than hot-looping; recovery is then up to the subscriber/processor. (Browser-mirror consumer has the same bug — tracked for Stage 4.) C2 — StreamProcessor.#ingest now persists the snapshot before advancing instead of silently advancing the in-memory checkpoint and no-oping the retry. Inlined the single-use #saveSnapshot. C3 — makeRpcTargetClass gained an `include` allowlist; StreamRpcTarget and PublicStreamRpcTarget now allowlist the explicit StreamRpc surface instead of denylisting two methods. The protected readCoreProcessorState / writeCoreProcessorState are no longer proxied onto the unauthenticated public target (where writeCoreProcessorState could inject an attacker-chosen subscription callable). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…review) Cursor Bugbot: the per-processor ingest chain was extended with `.then(...)` but had no trailing `.catch`, so if recoverFromIngestFailure threw (a failed re-handshake, or the poison-path stream/error-occurred append failing) the chain stayed rejected and every later batch's `.then` handler was skipped — live delivery would wedge permanently. Mirrors StreamProcessor.ingest, which already `.catch(() => undefined)`s its own #processing chain. Added a trailing `.catch` that logs and resolves; a future re-handshake (stream-side reconcile) is the way back. New regression test T1c proves the chain keeps processing after a re-handshake when the first recovery threw (verified failing without the catch). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
d89a4a9 to
98013a0
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit a9d4a4d. Configure here.
Cursor Bugbot (medium): requestStreamSubscription tore down the live handle and opened a new connection via openSubscription but, unlike recoverFromIngestFailure, never incremented entry.generation. Batches still queued on the prior connection could pass the generation gate and ingest after the replacement handshake, undermining the superseded-connection drop C1 depends on. Now the handshake invalidates the previous connection like the recovery path does, so the new connection's replay from the checkpoint is authoritative. (No dedicated test: the failure mode is a gap that only manifests under a precise stale-batch/replay offset interleaving that the in-memory fake can't deterministically reproduce; the generation gate itself is covered by T1, and this change just makes the two teardown paths consistent.) Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Addressed the second Bugbot finding (generation invalidation) in e5aeaac: |
…/M3/M4) (#1459) ## What **Stage 3** of the `packages/streams` review. Clears the last three regression ratchets (**T4, T5, T6** → `it`). The entire streams suite is now green with **no `it.fails` left**. **Stacked on #1458** (Stage 1) — base `streams-review-stage1-fixes`; retarget up the chain as the lower PRs merge. ## M2 — a `source` field no longer crashes `appendBatch` The DO append accepts `source`, but the runner-side parsers (`getEventSchema` / `getEventInputSchema`) were strict objects that omitted it, so the inline core reduce (`consumes: ["*"]`) threw `Unrecognized key: "source"` and rejected the whole batch. Extracted a shared `StreamEventSourceSchema` (+ `streamEventIdempotencyKeySchema`) in `event.ts` and used them in both parsers so input and reduce schemas agree. Kept `source` (the migration note shows OS wants it). `idempotencyKey` is now `trim().min(1)` on input too, closing the same crash class for blank keys. Covered by **T4**. ## M3 — circuit-breaker no longer drains on a backwards clock `spendCircuitBreakerToken` now clamps elapsed time with `Math.max(0, …)`. A regressed `createdAt` (DO migration / clock skew) previously *subtracted* refill — at the default rate a 1 s regression drained ~100k tokens and instantly tripped the breaker. Covered by **T5**. ## M4 — circuit-breaker now pauses a live flood that began tripping during replay The trip was edge-triggered on `previousState`, but `processEvent` isn't called for replay events (≤ the side-effect anchor), so when the not-tripped→tripped transition happened during replay, every live event afterward saw an already-tripped `previousState` and the breaker stayed silent. Now level-triggered: fires whenever the bucket is in deficit on a live event. Idempotency-keyed per offset and self-limiting (once the stream pauses, ordinary appends are rejected). Covered by **T6**. ## Verification ``` node pool: 63 passed (63) — no it.fails remaining workers pool: 8 passed (8) typecheck: clean lint: clean ``` 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches append/reduce validation on the hot path and changes when streams auto-pause under rate limits; behavior is pinned by T4–T6 but mis-tuned limits could still pause production streams. > > **Overview** > **Stage 3** closes three streams review findings (M2/M3/M4) and turns regression ratchets **T4, T5, T6** from `it.fails` into passing `it` tests. > > **Event schema alignment (M2):** Shared `StreamEventSourceSchema` and `streamEventIdempotencyKeySchema` in `event.ts` are wired into base input, `getEventInputSchema`, and `getEventSchema` so append and inline reduce agree. Events with `source` no longer fail core reduce with `Unrecognized key`; `idempotencyKey` is `trim().min(1)` at append time so blank keys cannot commit and then fail in reduce. > > **Circuit breaker (M3/M4):** `spendCircuitBreakerToken` clamps refill elapsed time with `Math.max(0, …)` so regressed `createdAt` (migration/clock skew) cannot subtract tokens and false-trip. Trip handling is **level-triggered**: `processEvent` appends `stream/paused` whenever the bucket is in deficit on a live event, without requiring a fresh not-tripped→tripped edge—so overload that tripped during replay still pauses the live flood. Pause appends stay idempotency-keyed per offset. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 625f875. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> <!-- CLOUDFLARE_PREVIEW --> ## Environment Config Lease <!-- CLOUDFLARE_PREVIEW_STATE --> <!-- { "apps": {}, "environmentConfigLease": { "dopplerConfig": "preview_2", "leasedUntil": 1781109153694, "leaseId": "01016957-06d2-477e-a4f6-0d71805ad0eb", "slug": "preview-2", "type": "environment-config-lease" } } --> <!-- /CLOUDFLARE_PREVIEW_STATE --> Lease: `preview-2` Doppler config: `preview_2` Type: `environment-config-lease` Leased until: 2026-06-10T16:32:33.694Z <!-- /CLOUDFLARE_PREVIEW --> --------- Co-authored-by: Claude Fable 5 <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
#1479) ## Streams review — Stages 4–7 (browser robustness, perf, dead code, docs) The final batch of the `packages/streams` adversarial review (`tasks/streams-review-fixes.md`), bundled into one PR per request. Stages 0/1/3 already shipped (#1455, #1458, #1459); **Stage 2 (lazy init) was abandoned** — #1460's subscriber-presence model appends a `subscriber-connected` fact on every `subscribe()`, which fundamentally conflicts with "don't initialize storage until the first `append()`". `created`/`woken` stay eager. All findings were re-verified against current `main` after #1460/#1457 moved the codebase; items those PRs already fixed or made obsolete are called out below. ### Stage 4 — browser-runtime correctness - **C1-browser** — the server delivery pump is fire-and-forget for inbound subscribers and never reports a failed `ingest`, so a browser mirror that fails to apply a batch silently desyncs forever. The browser now self-heals: `ingestWithSelfHeal` resubscribes from the persisted checkpoint with bounded exponential backoff on ingest failure. - **B1** — connection-epoch guard so a stale connection's late status callback can't clobber the live connection. - **B2** — `appendBatch`/`runtimeState` await connection readiness (`whenStreamReady`) instead of throwing "disposed" during a transient reconnect; only throw when actually disposed. - **B3** — the `ROLLBACK` in `stream-db.worker.ts` `batch()` is now in its own try/catch so it can't mask the original error (which `withBusyRetry`/`isBusyError` need to see). - **B4** — reconcile guards on stream incarnation (server `createdAt` + a `mirror_meta` table): rebuild the mirror on incarnation change rather than trusting the offset comparison after a `reset()`. - **B5** — `AbortSignal` on the Web Lock request (released on disposal) + the request rejection is surfaced instead of `void`-swallowed. - **B6** — query lifecycle: arm GC at query creation (not only on unsubscribe), skip listenerless queries in `#onChange`, and equality-check before notifying to avoid spurious `useSyncExternalStore` churn. ### Stage 5 — performance - **P1** — `browser-event-feed` O(n²) write amplification: coalesce to one op per `local_index` (was a cumulative op per event, each re-serializing the whole accumulated array). - **P2** — bound group rows with `MAX_GROUP_EVENTS = 200` so one dominant event type can't grow a single blob unboundedly. - **P3** — drop the per-event full-state `stateSchema.parse` in core `reduce`; state is validated only at the KV/recovery trust boundary now. - **P4** — already fixed by #1460 (subscribe override forwards `eventTypes`; filtering is server-side). No change. ### Stage 6 — dead code / elegance (~247 lines deleted) - **E1** dead exports in `shared/stream-processors.ts`; **E2** `waitForOpen`; **E3** unreachable circuit-breaker `paused` guard; **E5** dead `waitForEvent`/`messageInbox.error` machinery in `subscription.ts`; **E6** redundant circuit-breaker `consumes` entries. - **E4 obsolete** (#1460 removed `processor-registered`; the shared `circuit-breaker-types` is a live, separately-imported hierarchy, not dead code). **E7 declined** — post-#1460 the subscribe override genuinely needs to retain the client callback / wire `onRpcBroken`, so folding it into the generic `makeRpcTargetClass` isn't worth it. ### Stage 7 — docs - **D1** `design.md`: status banner, fixed 1-based offsets + core-event examples, the callable subscriber spec, SQLite/512KB-chunking storage, live-tail `replayAfterOffset` default, OPFS mirror; superseded banners on the never-shipped `implementProcessor`/`connectStream` sections. - **D2** `README.md`: real browser export (`withStreamConnectionFromBrowser`), sync `Disposable`, `?path=` route map; new "Append & subscription semantics" section (D5). - **D3** ADR 0001 superseded with the shipped per-`(namespace, path, slug)` singleton model. - **D4** remaining comment drift (`beforeAppend`→`validateAppend`, `afterAppendBatch`→`processEventBatch`). ### Testing `pnpm typecheck`, `pnpm lint`, node (79) + workers (15) suites, and example-app typecheck all green. Example-app Playwright e2e run locally against a clean `vite dev` (real Stream DO via Miniflare): **26 passed**. `origin/main` merged in (no `packages/streams` overlap; the new project-config stream-processor consumers don't touch any deleted exports). 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Touches browser reconnect, mirror discard, and ingest self-heal paths that affect live viewers and local SQLite mirrors; server stream DO behavior is mostly unchanged aside from core reduce hot-path optimization. > > **Overview** > Closes out **Stages 4–7** of the `packages/streams` adversarial review: browser mirror reliability, feed/core hot-path perf, dead-code removal, and doc alignment. Stage 2 (lazy stream init) stays **abandoned** and is only noted in `tasks/streams-review-fixes.md`. > > **Browser runtime** (`stream-browser-store.ts` and friends) is reworked so the OPFS mirror can recover instead of silently drifting. Inbound delivery is fire-and-forget, so failed `ingest` now triggers **`ingestWithSelfHeal`** (resubscribe from the last checkpoint with capped exponential backoff). Reconnects go through one **`scheduleReconnect`** path with a **connection epoch** so stale WebSocket callbacks cannot tear down the replacement connection. **`appendBatch` / `runtimeState`** use **`callWhenReady`** / **`whenStreamReady`** during transient reconnects instead of throwing “disposed”. Mirror trust uses server **`createdAt`** as incarnation identity via new **`mirror_meta`** helpers in **`stream-browser-db.ts`**. Web Lock **`release()`** aborts pending lock requests; the SQLite worker preserves the original batch error if `ROLLBACK` fails; reactive queries GC earlier, skip unobserved refreshes, and avoid notify when snapshots are unchanged. > > **Performance:** `browser-event-feed` **`planFeedOps`** coalesces one SQL op per group row and caps groups at **`MAX_GROUP_EVENTS` (200)**. Core **`reduce`** no longer exit-parses the full `stateSchema` on every appended event. > > **Cleanup:** ~247 lines removed (unused `stream-processors` exports, `waitForOpen`, dead subscription `waitForEvent` machinery, trimmed circuit-breaker `consumes` and an unreachable guard). **Docs:** `README` (real browser API, `?path=` routes, append/subscription semantics), `design.md` and ADR 0001 marked superseded where they diverge from shipped code. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit f52f305. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> <!-- CLOUDFLARE_PREVIEW --> ## Environment Config Lease <!-- CLOUDFLARE_PREVIEW_STATE --> <!-- { "apps": { "os": { "appDisplayName": "OS", "appSlug": "os", "status": "deployed", "updatedAt": "2026-06-10T22:02:58.860Z", "headSha": "f52f305a9f1ab43c9749f80e30bcd96705cbce59", "message": null, "publicUrl": "https://os.iterate-preview-3.com", "runUrl": "https://github.com/iterate/iterate/actions/runs/27309066809", "shortSha": "f52f305" } }, "environmentConfigLease": { "dopplerConfig": "preview_3", "leasedUntil": 1781132382486, "leaseId": "52648479-bd91-469d-91e5-a3338534feae", "slug": "preview-3", "type": "environment-config-lease" } } --> <!-- /CLOUDFLARE_PREVIEW_STATE --> Lease: `preview-3` Doppler config: `preview_3` Type: `environment-config-lease` Leased until: 2026-06-10T22:59:42.486Z ### OS Status: deployed Commit: `f52f305` Preview: https://os.iterate-preview-3.com [Workflow run](https://github.com/iterate/iterate/actions/runs/27309066809) Updated: 2026-06-10T22:02:58.860Z <!-- /CLOUDFLARE_PREVIEW --> --------- Co-authored-by: Claude Fable 5 <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
The packages/streams adversarial review is fully shipped — Stages 0/1/3 (#1455/#1458/#1459), Stages 4–7 (#1479), Stage 2 deliberately abandoned. This deletes the now-stale `tasks/streams-review-fixes.md` and removes the dangling pointers to it from the regression-test comments (the finding ids and the PRs in git history are the durable record). Doc/comment-only; no code changes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Low Risk** > Documentation and comment-only changes; no runtime or test behavior changes. > > **Overview** > Removes the completed **`tasks/streams-review-fixes.md`** tracking doc now that the June 2026 `packages/streams` review work is shipped (Stages 0/1/3 via #1455/#1458/#1459, Stages 4–7 via #1479; Stage 2 abandoned). > > Regression test file headers are updated so they no longer link to that task file. Comments now cite **review finding ids (C*/M*)** and the **merged PRs** as the durable record instead. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 7c33b86. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY --> <!-- CLOUDFLARE_PREVIEW --> ## Environment Config Lease <!-- CLOUDFLARE_PREVIEW_STATE --> <!-- { "apps": { "os": { "appDisplayName": "OS", "appSlug": "os", "status": "deployed", "updatedAt": "2026-06-11T09:38:48.453Z", "headSha": "7c33b86fa2d6f11b18ad8e99bc42c78940c8a623", "message": null, "publicUrl": "https://os.iterate-preview-2.com", "runUrl": "https://github.com/iterate/iterate/actions/runs/27337613329", "shortSha": "7c33b86" } }, "environmentConfigLease": { "dopplerConfig": "preview_2", "leasedUntil": 1781174051088, "leaseId": "82ae23a4-0588-41db-bf69-772ac5bb0d87", "slug": "preview-2", "type": "environment-config-lease" } } --> <!-- /CLOUDFLARE_PREVIEW_STATE --> Lease: `preview-2` Doppler config: `preview_2` Type: `environment-config-lease` Leased until: 2026-06-11T10:34:11.088Z ### OS Status: deployed Commit: `7c33b86` Preview: https://os.iterate-preview-2.com [Workflow run](https://github.com/iterate/iterate/actions/runs/27337613329) Updated: 2026-06-11T09:38:48.453Z <!-- /CLOUDFLARE_PREVIEW --> Co-authored-by: Claude Fable 5 <noreply@anthropic.com>

What
Stage 1 of the
packages/streamsreview (plan intasks/streams-review-fixes.md). Fixes the three CRITICAL findings. Flips the Stage 0 ratchets T1, T1b, T2, T3 fromit.fails→it.Stacked on #1455 (Stage 0 tests) — base is
streams-review-stage0-tests; retarget tomainonce that merges.C1 — a failed batch no longer drops events (data loss)
The delivery pump is fire-and-forget and advances its cursor before delivery, so a swallowed ingest failure left a permanent gap in the subscriber's state. Per the agreed approach (consumer resubscribes from checkpoint):
generation, and ingest runs through a per-processor serial chain that re-checks the generation between batches — batches from the superseded connection are dropped, and the post-recovery replay is the single source of truth.MAX_CONSECUTIVE_INGEST_FAILURES(3), the host appends astream/error-occurredevent and disconnects rather than hot-looping; recovery is then up to the subscriber/processor.Covered by T1 (transient recovery under continued delivery) + T1b (poison → error + disconnect).
C2 — checkpoint ordering (silent data loss on write failure)
StreamProcessor.#ingestadvanced#state/#checkpointOffsetbeforewriteState, so a failed durable write left the in-memory checkpoint ahead and the retried batch silently no-oped. Now it persists the snapshot first, then advances. (Inlined the single-use#saveSnapshot.) Covered by T2.C3 —
PublicStreamRpcTargetmethod leak (security)makeRpcTargetClasscopied every prototype method except a two-item denylist, so theprotectedreadCoreProcessorState/writeCoreProcessorStatewere proxied onto the unauthenticated public target —writeCoreProcessorStatecould inject an attacker-chosen subscription callable that the DO later dispatches with fullenv. Added anincludeallowlist; both Stream targets now allowlist the explicitStreamRpcsurface (satisfies readonly (keyof StreamRpc)[]so it can't drift). Covered by T3.Verification
🤖 Generated with Claude Code
Note
High Risk
Changes durable ingest/checkpoint behavior and closes a security hole on unauthenticated Stream RPC; incorrect recovery or ordering could still cause data loss or wedge delivery.
Overview
Stage 1 closes three CRITICAL streams review findings (C1/C2/C3) and turns regression tests T1, T1b, T1c, T2, and T3 from
it.failsinto passingit.C1 — failed batches no longer lose events (hosted path).
createStreamProcessorHostno longer swallows ingest errors. On failure it re-handshakes from the durable checkpoint so the stream replays the batch. A generation gate plus a per-processor ingest chain drops batches from superseded connections while recovery runs (avoids advancing the checkpoint past a gap). After three consecutive failures it appendsstream/error-occurredand disconnects instead of hot-looping. Regression coverage adds a replay-capablefakeStream, T1b (poison), and T1c (recovery throw must not wedge the chain).C2 — checkpoint vs durable write.
StreamProcessor.#ingestnowawaitswriteStatebefore updating in-memory#state/#checkpointOffset, so a failed write leaves the batch retryable instead of a silent no-op.C3 — public RPC surface.
makeRpcTargetClassgains anincludeallowlist;PublicStreamRpcTarget/StreamRpcTargetonly expose explicitStreamRpcmethods so protectedreadCoreProcessorState/writeCoreProcessorStateare not callable on the unauthenticated public target.Task doc and test headers are updated to mark C1/C2/C3 fixed on the hosted path; browser mirror recovery remains Stage 4.
Reviewed by Cursor Bugbot for commit e5aeaac. Bugbot is set up for automated code reviews on this repo. Configure here.
Environment Config Lease
No active environment config lease.
OS
Status: released
Commit:
a9d4a4dPreview: https://os.iterate-preview-4.com
Summary: Preview app released.
Workflow run
Updated: 2026-06-10T15:34:34.983Z