Skip to content

Commit 343d2c7

Browse files
antfuclaude
andauthored
feat(devframe): streaming channel API for server↔client chunks (#307)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent decfe30 commit 343d2c7

68 files changed

Lines changed: 4832 additions & 235 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

alias.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export const alias = {
2121
'devframe/utils/promise': df('devframe/src/utils/promise.ts'),
2222
'devframe/utils/shared-state': df('devframe/src/utils/shared-state.ts'),
2323
'devframe/utils/state': df('devframe/src/utils/state.ts'),
24+
'devframe/utils/streaming-channel': df('devframe/src/utils/streaming-channel.ts'),
2425
'devframe/utils/when': df('devframe/src/utils/when.ts'),
2526
'devframe/adapters/cli': df('devframe/src/adapters/cli.ts'),
2627
'devframe/adapters/dev': df('devframe/src/adapters/dev.ts'),

devframe/docs/.vitepress/sidebar.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export default function devframeSidebar(prefix = ''): DefaultTheme.SidebarItem[]
1010
{ text: 'Adapters', link: `${prefix}/guide/adapters` },
1111
{ text: 'RPC', link: `${prefix}/guide/rpc` },
1212
{ text: 'Shared State', link: `${prefix}/guide/shared-state` },
13+
{ text: 'Streaming', link: `${prefix}/guide/streaming` },
1314
{ text: 'Dock System', link: `${prefix}/guide/dock-system` },
1415
{ text: 'Commands', link: `${prefix}/guide/commands` },
1516
{ text: 'When Clauses', link: `${prefix}/guide/when-clauses` },
@@ -26,7 +27,7 @@ export default function devframeSidebar(prefix = ''): DefaultTheme.SidebarItem[]
2627
text: 'Error Reference',
2728
link: `${prefix}/errors/`,
2829
collapsed: true,
29-
items: Array.from({ length: 28 }, (_, i) => {
30+
items: Array.from({ length: 32 }, (_, i) => {
3031
const code = `DF${String(i + 1).padStart(4, '0')}`
3132
return { text: code, link: `${prefix}/errors/${code}` }
3233
}),

devframe/docs/errors/DF0029.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0029: Stream Buffer Overflow
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Stream "`{channel}#{id}`" dropped `{dropped}` chunk(s) after exceeding the client high-water mark.
12+
13+
## Cause
14+
15+
A streaming subscriber's queue grew past its `highWaterMark` because the consumer is slower than the producer. The oldest chunks were dropped to keep memory bounded.
16+
17+
This is a soft warning — the stream keeps running and remaining chunks still flow.
18+
19+
## Fix
20+
21+
- Raise `highWaterMark` on `rpc.streaming.subscribe(channel, id, { highWaterMark })` if the consumer can occasionally catch up.
22+
- Slow the producer so it doesn't outpace the wire (e.g. throttle, debounce, or batch chunks server-side).
23+
- Switch to `sharedState` if you only need the latest value rather than every intermediate chunk.
24+
25+
## Source
26+
27+
`packages/devframe/src/client/rpc-streaming.ts`

devframe/docs/errors/DF0030.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0030: Unknown Stream ID
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Stream "`{channel}#{id}`" is unknown — no producer has called `channel.start({ id: "{id}" })`.
12+
13+
## Cause
14+
15+
A client subscribed to a stream id that the server-side channel doesn't know about. Either the producer never started a stream with that id, the producer already ended it and `replayWindow` is `0`, or the client passed the wrong id.
16+
17+
## Fix
18+
19+
- Make sure the action that returns the stream id runs **before** the client subscribes — typically by awaiting `rpc.call('your-action')` and using the returned id.
20+
- Bump `replayWindow` on `ctx.rpc.streaming.create(name, { replayWindow })` if you need clients to resume after the producer has finished but kept the buffer warm.
21+
- Check the id is propagated correctly across boundaries (action return value → component prop → subscribe call).
22+
23+
## Source
24+
25+
`packages/devframe/src/node/rpc-streaming.ts`

devframe/docs/errors/DF0031.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0031: Write to Closed Stream
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Cannot write to closed stream "`{channel}#{id}`".
12+
13+
## Cause
14+
15+
`stream.write(chunk)` was called after the stream was closed via `stream.close()` / `stream.error()` or after the consumer cancelled (which aborts `stream.signal`).
16+
17+
## Fix
18+
19+
Producers should poll `stream.signal.aborted` and exit cleanly:
20+
21+
```ts
22+
const stream = channel.start({ id })
23+
try {
24+
for (const chunk of source) {
25+
if (stream.signal.aborted)
26+
return
27+
stream.write(chunk)
28+
}
29+
stream.close()
30+
}
31+
catch (err) {
32+
stream.error(err)
33+
}
34+
```
35+
36+
## Source
37+
38+
`packages/devframe/src/utils/streaming-channel.ts`

devframe/docs/errors/DF0032.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
outline: deep
3+
---
4+
5+
# DF0032: Streaming Channel Already Registered
6+
7+
> Package: `devframe`
8+
9+
## Message
10+
11+
> Streaming channel "`{channel}`" is already registered.
12+
13+
## Cause
14+
15+
Two calls to `ctx.rpc.streaming.create(name, ...)` used the same channel name. Each name owns a wire namespace and must be unique within a context.
16+
17+
## Fix
18+
19+
- Reuse the existing channel handle rather than creating a new one with the same name.
20+
- If two devtools want isolated streams, give each a distinct namespaced name (`my-devtool:chat-stream`, `other-devtool:logs-stream`).
21+
22+
## Source
23+
24+
`packages/devframe/src/node/rpc-streaming.ts`

devframe/docs/errors/index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,7 @@ Emitted by `devframe` — framework-neutral host / shared-state / auth surface.
4646
| [DF0026](./DF0026) | error | No Dump Match | DTK0006 |
4747
| [DF0027](./DF0027) | error | Invalid Dump Configuration | DTK0007 |
4848
| [DF0028](./DF0028) | error | Snapshot Type Mismatch | DTK0008 |
49+
| [DF0029](./DF0029) | warn | Stream Buffer Overflow ||
50+
| [DF0030](./DF0030) | error | Unknown Stream ID ||
51+
| [DF0031](./DF0031) | error | Write to Closed Stream ||
52+
| [DF0032](./DF0032) | error | Streaming Channel Already Registered ||

devframe/docs/guide/rpc.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,20 @@ defineDevtool({
144144
| `event` | `boolean` | Fire-and-forget (don't wait for responses). |
145145
| `filter` | `(client) => boolean` | Skip specific clients. |
146146

147+
## Streaming
148+
149+
For chunk-style server→client feeds (chat deltas, log lines, build progress), reach for [streaming channels](./streaming) instead of hand-rolling `action + delta/end events`. The streaming API gives you stream IDs, cancellation, replay, and Web Streams interop for free:
150+
151+
```ts
152+
const channel = ctx.rpc.streaming.create<string>('my-devtool:chat', {
153+
replayWindow: 256,
154+
})
155+
const stream = channel.start()
156+
sourceReadable.pipeTo(stream.writable)
157+
```
158+
159+
See the [Streaming guide](./streaming) for the full API.
160+
147161
## Local Invocation
148162

149163
`ctx.rpc.invokeLocal` calls a registered server function directly without going through a transport — useful for cross-function composition on the server side:

0 commit comments

Comments
 (0)