-
Notifications
You must be signed in to change notification settings - Fork 306
Closed
Description
I want to pipe from a single readable to multiple writables. Let's assume writables process the data at various speeds, both being slower than the readable source.
Do I need to implement some extra logic to ensure I don't lose any data? Some old SO threads suggest so: https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets
On the other side, in this snippet, the faster Writable just waits for the slower one, and both consume all of produced data.
import stream from "stream";
class ReadableStream extends stream.Readable {
constructor() {
super({ objectMode: true });
}
i = 0;
_read(size: number) {
while (true) {
this.i++;
if (this.i > 150) {
return this.push(null);
}
const resp = this.push({ key: this.i });
if (!resp) {
for (let i = 0; i < 10; i++) {
this.i++;
console.log(`Pushing over limit: ${this.i}`);
this.push({ key: this.i });
}
return;
}
}
}
}
class FastWritable extends stream.Writable {
constructor() {
super({ objectMode: true });
}
_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void
) {
setTimeout(async () => {
console.log("FAST WRITE CONSUMED", chunk.key);
callback();
}, 20);
}
}
class SlowWritable extends stream.Writable {
constructor() {
super({ objectMode: true });
}
_write(
chunk: any,
encoding: string,
callback: (error?: Error | null) => void
) {
setTimeout(async () => {
console.log("SLOW WRITE CONSUMED", chunk.key);
callback();
}, 1500);
}
}
const read = new ReadableStream();
read.pipe(new FastWritable());
read.pipe(new SlowWritable());
// everything works fine, both streams consume all dataReactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels