Status: Implemented (Reference Implementation) Date: January 2026
This document describes the design of the new streams API based on iterables with explicit backpressure handling. For the API reference, see API.md.
- Design Principles
- Core Concepts
- Type System
- Push Stream
- Stream Factories
- Pull Pipeline
- Write To
- Consumers
- Multi-Consumer
- Protocol Extensibility
- WebIDL Definitions
See the README.md for motivation and critique of the Web Streams API.
All stream iterables yield Uint8Array[] - arrays of chunks. This batching amortizes the overhead of async iteration:
// Multiple chunks can be yielded per iteration
for await (const chunks of readable) {
// chunks is Uint8Array[]
for (const chunk of chunks) {
// chunk is Uint8Array
process(chunk);
}
}Transforms receive null as a flush signal indicating end-of-input:
const transform = (chunks: Uint8Array[] | null) => {
if (chunks === null) {
// Flush - emit any buffered data
return finalData;
}
// Normal processing
return processedChunks;
};The API distinguishes between:
- Writers - Push data into a stream (
writer.write()) - Sources - Pull data from a stream (iteration)
Stream.push() creates a bonded pair connecting both models.
A Streamable is any object that implements iteration protocols and yields stream-compatible values:
// Sync streamable - yields via Symbol.iterator
interface SyncStreamable {
[Symbol.iterator](): Iterator<SyncStreamableYield>;
}
// Async streamable - yields via Symbol.asyncIterator
interface AsyncStreamable {
[Symbol.asyncIterator](): AsyncIterator<AsyncStreamableYield>;
}
// Either sync or async
type Streamable = SyncStreamable | AsyncStreamable;These types can be written directly:
type PrimitiveChunk = string | ArrayBuffer | ArrayBufferView;string- UTF-8 encoded toUint8ArrayArrayBuffer- Wrapped asUint8Arrayview (no copy)ArrayBufferView- Converted toUint8Arrayview of same memory
// Options passed to transform functions by the pipeline
interface TransformOptions {
/** Signal that fires when the pipeline is cancelled, errors, or
* the consumer stops iteration. */
readonly signal: AbortSignal;
}
// Stateless transform - function called for each batch
type TransformFn = (
chunks: Uint8Array[] | null,
options: TransformOptions
) => AsyncTransformResult;
// Stateful transform - generator wrapping entire source
type StatefulTransformFn = (
source: AsyncIterable<Uint8Array[] | null>,
options: TransformOptions
) => AsyncIterable<TransformYield>;
// Transform object for stateful transforms
// Using an object (vs a plain function) indicates this is a stateful transform
interface TransformObject {
transform: StatefulTransformFn;
}
// Function = stateless, Object = stateful
type Transform = TransformFn | TransformObject;interface WriteOptions {
readonly signal?: AbortSignal;
}
interface Writer {
readonly desiredSize: number | null; // >= 0 or null (closed)
write(chunk: Uint8Array | string, options?: WriteOptions): Promise<void>;
writev(chunks: (Uint8Array | string)[], options?: WriteOptions): Promise<void>;
writeSync(chunk: Uint8Array | string): boolean;
writevSync(chunks: (Uint8Array | string)[]): boolean;
end(options?: WriteOptions): Promise<number>;
endSync(): number;
fail(reason?: Error): Promise<void>;
failSync(reason?: Error): boolean;
}Stream.push() creates a bonded writer and async iterable pair:
function push(
...args: [...Transform[], PushStreamOptions?]
): { writer: Writer; readable: AsyncIterable<Uint8Array[]> }
interface PushStreamOptions {
highWaterMark?: number; // Max pending writes (default: 4)
backpressure?: BackpressurePolicy; // 'strict' | 'block' | 'drop-oldest' | 'drop-newest'
signal?: AbortSignal;
}Behavior:
- Default
highWaterMark: 1means only one pending write at a time (strictest backpressure) - Each
write()orwritev()call counts as a single slot - Consumer termination propagates to writer (breaking iteration closes writer)
Example:
const { writer, readable } = Stream.push({ highWaterMark: 10 });
// Producer
await writer.write("chunk 1");
await writer.write("chunk 2");
await writer.end();
// Consumer
for await (const chunks of readable) {
for (const chunk of chunks) {
console.log(new TextDecoder().decode(chunk));
}
}Backpressure in push streams is explicit and strict by default. Writers must await write() calls to avoid overwhelming the buffer.
The backpressure option allows alternative policies:
'strict'(default) - Writes reject immediately when buffer is full'block'- Async writes wait for buffer space; sync writes return false'drop-oldest'- Oldest buffered chunks are dropped to make room for new writes'drop-newest'- Incoming writes are dropped when buffer is full
The highWaterMark option controls the maximum number of pending writes allowed before backpressure is applied. Note that highWaterMark is specifically counted
towards write calls, not byte size. Why? Great question! Because it simplifies reasoning about backpressure in terms of discrete write operations rather than variable byte sizes, which can be more complex to manage. Also, since the data
is already in memory as Uint8Array, the overhead of managing byte sizes is less relevant and is not going to help save memory in typical use cases. For backpressure
signaling, what matters is whether the producer is overwhelming the consumer with too many write calls when data is not being consumed.
Developers can create their own implementations of WriterInterface to handle custom
backpressure strategies based on byte sizes if needed.
Create an async iterable from various sources:
function from(input: ByteInput | Streamable): AsyncIterable<Uint8Array[]>Inputs:
string- Single UTF-8 encoded chunkArrayBuffer/ArrayBufferView- Single chunkIterable/AsyncIterable- Normalized and wrapped
Create a sync iterable:
function fromSync(input: ByteInput | SyncStreamable): Iterable<Uint8Array[]>Example:
// From string
const readable = Stream.from("Hello, World!");
// From generator
const readable = Stream.from(async function* () {
yield "chunk 1";
yield "chunk 2";
}());
// From array (sync)
const syncReadable = Stream.fromSync(["a", "b", "c"]);Pull-through async pipeline with variadic transforms:
function pull(
source: Streamable,
...args: [...Transform[], PullOptions?]
): AsyncIterable<Uint8Array[]>
interface PullOptions {
signal?: AbortSignal;
}Behavior:
- Transforms execute lazily when consumer pulls
- Sync sources work directly - no async wrapping overhead
- Error in transform N aborts the pipeline signal, notifying all transforms
Sync pipeline for CPU-bound workloads:
function pullSync(
source: SyncStreamable,
...transforms: SyncTransform[]
): Iterable<Uint8Array[]>Example:
// Async pipeline
const output = Stream.pull(source, compress, encrypt, { signal });
for await (const chunks of output) { /* ... */ }
// Sync pipeline (no async overhead)
const syncOutput = Stream.pullSync(syncSource, transformSync);
for (const chunks of syncOutput) { /* ... */ }Consume source and write to a writer with optional transforms:
function pipeTo(
source: AsyncIterable<Uint8Array[]> | Iterable<Uint8Array[]>,
...args: [...Transform[], Writer, PipeToOptions?]
): Promise<number> // Total bytes written
interface PipeToOptions {
signal?: AbortSignal;
preventClose?: boolean; // Don't call writer.end() on completion
preventFail?: boolean; // Don't call writer.fail() on error
}Sync version:
function pipeToSync(
source: Iterable<Uint8Array[]>,
...args: [...SyncTransform[], SyncWriter, PipeToSyncOptions?]
): numberExample:
// Pipe with transforms
const bytesWritten = await Stream.pipeTo(
source,
compress,
encrypt,
fileWriter,
{ signal: controller.signal }
);// Collect all bytes
function bytes(source, options?: ConsumeOptions): Promise<Uint8Array>
function bytesSync(source, options?: ConsumeSyncOptions): Uint8Array
// Collect and decode as text
function text(source, options?: TextOptions): Promise<string>
function textSync(source, options?: TextSyncOptions): string
// Collect as ArrayBuffer
function arrayBuffer(source, options?: ConsumeOptions): Promise<ArrayBuffer>
function arrayBufferSync(source, options?: ConsumeSyncOptions): ArrayBuffer
interface ConsumeOptions {
signal?: AbortSignal;
limit?: number; // Max bytes - throws if exceeded
}
interface TextOptions extends ConsumeOptions {
encoding?: string; // Default: 'utf-8'
}Observe chunks without modifying:
function tap(callback: TapCallbackAsync): Transform
function tapSync(callback: TapCallback): SyncTransform
type TapCallback = (chunks: Uint8Array[] | null) => void;Example:
// Hash while streaming
const hasher = crypto.createHash('sha256');
const output = Stream.pull(
source,
Stream.tap((chunks) => {
if (chunks) for (const c of chunks) hasher.update(c);
})
);
await Stream.pipeTo(output, destination);
console.log('Hash:', hasher.digest('hex'));Combine multiple sources by temporal order:
function merge(
...args: [...AsyncIterable<Uint8Array[]>[], MergeOptions?]
): AsyncIterable<Uint8Array[]>Creates a multi-consumer broadcast channel where a writer pushes to all consumers:
function broadcast(options?: BroadcastOptions): {
writer: Writer;
broadcast: Broadcast;
}
interface Broadcast {
push(...transforms?, options?): AsyncIterable<Uint8Array[]>;
readonly consumerCount: number;
readonly bufferSize: number;
cancel(reason?: Error): void;
}
interface BroadcastOptions {
bufferLimit?: number; // Default: 16
backpressure?: BackpressurePolicy; // Default: 'strict'
signal?: AbortSignal;
}Buffer Model:
- Single unified buffer shared by all consumers
- Each consumer has independent cursor position
- Buffer trimmed as slowest consumer advances
- Backpressure based on slowest consumer vs buffer limit
Example:
const { writer, broadcast } = Stream.broadcast({ bufferLimit: 100 });
// Create consumers with different transforms
const consumer1 = broadcast.push();
const consumer2 = broadcast.push(decompress);
// Producer
for await (const chunk of source) {
await writer.write(chunk);
}
await writer.end();
// Consumers receive independently
const [raw, decompressed] = await Promise.all([
Stream.bytes(consumer1),
Stream.bytes(consumer2)
]);Wraps a source for multi-consumer access with lazy pulling:
function share(source, options?: ShareOptions): Share
function shareSync(source, options?: ShareSyncOptions): SyncShare
interface Share {
pull(...transforms?, options?): AsyncIterable<Uint8Array[]>;
readonly consumerCount: number;
readonly bufferSize: number;
cancel(reason?: Error): void;
}Buffer Model:
- Source pulled lazily on demand
- Buffer grows as consumers diverge
- Backpressure policy prevents unbounded growth:
'strict': Fast consumer blocks until slow catches up'drop-oldest': Slow consumer misses data'drop-newest': All consumers miss data
Example:
const shared = Stream.share(fileStream, { bufferLimit: 100 });
// Create consumers
const consumer1 = shared.pull();
const consumer2 = shared.pull(decompress);
const consumer3 = shared.pull(decompress, parse);
// All share the same source
const [raw, decompressed, parsed] = await Promise.all([
Stream.bytes(consumer1),
Stream.bytes(consumer2),
Stream.bytes(consumer3)
]);| Aspect | broadcast() |
share() |
|---|---|---|
| Model | Push (writer -> consumers) | Pull (source -> consumers) |
| Data source | Writer pushes explicitly | Source pulled on demand |
| Create consumer | broadcast.push() |
share.pull() |
| Sync version | No | Yes (shareSync) |
| Use case | Event sources, WebSocket | File, response body, iterables |
Allow objects to convert themselves to streamable data:
const toStreamable: unique symbol = Symbol.for('Stream.toStreamable');
const toAsyncStreamable: unique symbol = Symbol.for('Stream.toAsyncStreamable');
interface ToStreamable {
[toStreamable](): SyncStreamableYield;
}
interface ToAsyncStreamable {
[toAsyncStreamable](): AsyncStreamableYield | Promise<AsyncStreamableYield>;
}Example:
class JsonMessage {
constructor(private data: object) {}
[Stream.toStreamable]() {
return JSON.stringify(this.data);
}
}
// Can be written directly
await writer.write(new JsonMessage({ hello: "world" }));Allow objects to provide optimized multi-consumer implementations:
const broadcastProtocol: unique symbol = Symbol.for('Stream.broadcastProtocol');
const shareProtocol: unique symbol = Symbol.for('Stream.shareProtocol');
const shareSyncProtocol: unique symbol = Symbol.for('Stream.shareSyncProtocol');
interface Broadcastable {
[broadcastProtocol](options?: BroadcastOptions): Broadcast;
}
interface Shareable {
[shareProtocol](options?: ShareOptions): Share;
}Static Methods:
// Get or create from Broadcastable or Streamable
Broadcast.from(input, options?): BroadcastResult
// Get or create from Shareable or Streamable
Share.from(input, options?): Share
SyncShare.fromSync(input, options?): SyncShareThe following WebIDL definitions describe the new streams API for potential standardization. These definitions capture the essential interfaces while allowing implementation flexibility.
// Backpressure policy enumeration
enum BackpressurePolicy { "strict", "drop-oldest", "drop-newest" };
// Byte chunk type - the fundamental unit of data
typedef Uint8Array ByteChunk;
// Batch of chunks - used for iteration to amortize async overhead
typedef sequence<ByteChunk> ChunkBatch;
// Input types that can be written to streams
typedef (Uint8Array or USVString) WritableChunk;dictionary WriteOptions {
AbortSignal signal;
};
[Exposed=*]
interface Writer {
// Backpressure indicator: >= 0 (slots available), null (closed/failed)
readonly attribute long? desiredSize;
// Async write operations
Promise<undefined> write(WritableChunk chunk, optional WriteOptions options = {});
Promise<undefined> writev(sequence<WritableChunk> chunks, optional WriteOptions options = {});
// Sync write attempts - return true if successful, false if buffer full
boolean writeSync(WritableChunk chunk);
boolean writevSync(sequence<WritableChunk> chunks);
// End stream - returns total bytes written
Promise<unsigned long long> end(optional WriteOptions options = {});
long long endSync(); // Returns -1 if cannot complete synchronously
// Fail stream (put writer into error state)
Promise<undefined> fail(optional any reason);
boolean failSync(optional any reason);
};
[Exposed=*]
interface SyncWriter {
readonly attribute long? desiredSize;
undefined write(WritableChunk chunk);
undefined writev(sequence<WritableChunk> chunks);
unsigned long long end();
undefined fail(optional any reason);
};dictionary PushStreamOptions {
unsigned long highWaterMark = 1;
BackpressurePolicy backpressure = "strict";
AbortSignal signal;
};
[Exposed=*]
interface PushStreamResult {
readonly attribute Writer writer;
readonly attribute ReadableByteStream readable;
};
// ReadableByteStream is an async iterable yielding ChunkBatch
[Exposed=*]
interface ReadableByteStream {
async iterable<ChunkBatch>;
};
// SyncReadableByteStream is a sync iterable yielding ChunkBatch
[Exposed=*]
interface SyncReadableByteStream {
iterable<ChunkBatch>;
};// Options passed to transform functions by the pipeline
dictionary TransformOptions {
required AbortSignal signal;
};
// Transform function signature (TypeScript-style for clarity)
// transform: (chunks: Uint8Array[] | null, options: TransformOptions) => TransformResult
callback TransformFunction = any (ChunkBatch? chunks, TransformOptions options);
callback StatefulTransformFunction = any (AsyncIterable source, TransformOptions options);
// TransformObject is always stateful (receives entire source as async iterable)
// Using an object vs a plain function indicates stateful transform
dictionary TransformObject {
required StatefulTransformFunction transform;
};
// Function = stateless (called per batch), Object = stateful (receives entire source)
typedef (TransformFunction or TransformObject) Transform;
typedef (TransformFunction or TransformObject) SyncTransform;dictionary PullOptions {
AbortSignal signal;
};
dictionary WriteToOptions {
AbortSignal signal;
boolean preventClose = false;
boolean preventFail = false;
};
dictionary WriteToSyncOptions {
boolean preventClose = false;
boolean preventFail = false;
};
dictionary ConsumeOptions {
AbortSignal signal;
unsigned long long limit;
};
dictionary ConsumeSyncOptions {
unsigned long long limit;
};
dictionary TextOptions : ConsumeOptions {
DOMString encoding = "utf-8";
};
dictionary TextSyncOptions : ConsumeSyncOptions {
DOMString encoding = "utf-8";
};
dictionary MergeOptions {
AbortSignal signal;
};dictionary BroadcastOptions {
unsigned long bufferLimit = 16;
BackpressurePolicy backpressure = "strict";
AbortSignal signal;
};
[Exposed=*]
interface Broadcast {
// Create consumer with optional transforms
ReadableByteStream push(Transform... transforms, optional PushStreamOptions options = {});
readonly attribute unsigned long consumerCount;
readonly attribute unsigned long bufferSize;
undefined cancel(optional any reason);
};
[Exposed=*]
interface BroadcastResult {
readonly attribute Writer writer;
readonly attribute Broadcast broadcast;
};dictionary ShareOptions {
unsigned long bufferLimit = 16;
BackpressurePolicy backpressure = "strict";
AbortSignal signal;
};
dictionary ShareSyncOptions {
unsigned long bufferLimit = 16;
BackpressurePolicy backpressure = "strict";
};
[Exposed=*]
interface Share {
// Create consumer with optional transforms
ReadableByteStream pull(Transform... transforms, optional PullOptions options = {});
readonly attribute unsigned long consumerCount;
readonly attribute unsigned long bufferSize;
undefined cancel(optional any reason);
};
[Exposed=*]
interface SyncShare {
SyncReadableByteStream pull(SyncTransform... transforms);
readonly attribute unsigned long consumerCount;
readonly attribute unsigned long bufferSize;
undefined cancel(optional any reason);
};[Exposed=*]
namespace Stream {
// Push stream factory
PushStreamResult push(Transform... transforms, optional PushStreamOptions options = {});
// Source factories
ReadableByteStream from((USVString or BufferSource or object) input);
SyncReadableByteStream fromSync((USVString or BufferSource or object) input);
// Pull pipelines
ReadableByteStream pull(object source, Transform... transforms, optional PullOptions options = {});
SyncReadableByteStream pullSync(object source, SyncTransform... transforms);
// Pipe to destination
Promise<unsigned long long> pipeTo(
object source,
Transform... transforms,
Writer destination,
optional PipeToOptions options = {}
);
unsigned long long pipeToSync(
object source,
SyncTransform... transforms,
SyncWriter destination,
optional PipeToSyncOptions options = {}
);
// Async consumers
Promise<Uint8Array> bytes(object source, optional ConsumeOptions options = {});
Promise<USVString> text(object source, optional TextOptions options = {});
Promise<ArrayBuffer> arrayBuffer(object source, optional ConsumeOptions options = {});
// Sync consumers
Uint8Array bytesSync(object source, optional ConsumeSyncOptions options = {});
USVString textSync(object source, optional TextSyncOptions options = {});
ArrayBuffer arrayBufferSync(object source, optional ConsumeSyncOptions options = {});
// Combining
ReadableByteStream merge(ReadableByteStream... sources, optional MergeOptions options = {});
// Multi-consumer factories
BroadcastResult broadcast(optional BroadcastOptions options = {});
Share share(object source, optional ShareOptions options = {});
SyncShare shareSync(object source, optional ShareSyncOptions options = {});
// Observation transforms
Transform tap(TapCallback callback);
SyncTransform tapSync(SyncTapCallback callback);
};
callback TapCallback = (undefined or Promise<undefined>) (ChunkBatch? chunks);
callback SyncTapCallback = undefined (ChunkBatch? chunks);// Protocol symbols for extensibility (accessed via Stream namespace)
// These allow custom objects to participate in streaming
partial namespace Stream {
// Value conversion protocols
readonly attribute symbol toStreamable; // Symbol.for('Stream.toStreamable')
readonly attribute symbol toAsyncStreamable; // Symbol.for('Stream.toAsyncStreamable')
// Multi-consumer protocols
readonly attribute symbol broadcastProtocol; // Symbol.for('Stream.broadcastProtocol')
readonly attribute symbol shareProtocol; // Symbol.for('Stream.shareProtocol')
readonly attribute symbol shareSyncProtocol; // Symbol.for('Stream.shareSyncProtocol')
};// Broadcast.from() - get or create from Broadcastable or Streamable
[Exposed=*]
namespace Broadcast {
BroadcastResult from(object input, optional BroadcastOptions options = {});
};
// Share.from() - get or create from Shareable or Streamable
[Exposed=*]
namespace Share {
Share from(object input, optional ShareOptions options = {});
};
// SyncShare.fromSync() - get or create from SyncShareable or SyncStreamable
[Exposed=*]
namespace SyncShare {
SyncShare fromSync(object input, optional ShareSyncOptions options = {});
};-
Variadic Transforms: The WebIDL uses
Transform... transformsto represent variadic transform arguments. In practice, implementations detect the options object by checking for the absence of atransformproperty. -
Object Sources: Sources are typed as
objectin WebIDL because they are structurally typed (must be iterable). The implementation checks forSymbol.iteratororSymbol.asyncIterator. -
Protocol Symbols: The symbols are defined using
Symbol.for()to allow third-party code to implement protocols without importing the symbols directly. -
Chunk Batching: All iterables yield
sequence<Uint8Array>(ChunkBatch) rather than individual chunks. This is a deliberate design choice to amortize async iteration overhead. -
Non-Negative desiredSize: Unlike Web Streams where
desiredSizecan be negative, this API guaranteesdesiredSize >= 0ornull(closed/failed).
| Document | Description |
|---|---|
| API.md | Complete API reference for the reference implementation |
| README.md | Motivation and criticism of Web Streams API |
| MIGRATION.md | Guide for migrating from Web Streams API |
| REQUIREMENTS.md | Detailed requirements and test specifications |
| REFACTOR-TODO.md | Implementation notes and session history |