Skip to content

Piping from a readable to multiple writables. #2707

@vlopp

Description

@vlopp

I want to pipe from a single readable to multiple writables. Let's assume writables process the data at various speeds, both being slower than the readable source.

Do I need to implement some extra logic to ensure I don't lose any data? Some old SO threads suggest so: https://stackoverflow.com/questions/19553837/node-js-piping-the-same-readable-stream-into-multiple-writable-targets

On the other side, in this snippet, the faster Writable just waits for the slower one, and both consume all of produced data.

import stream from "stream";

class ReadableStream extends stream.Readable {
  constructor() {
    super({ objectMode: true });
  }

  i = 0;
  _read(size: number) {

    while (true) {
      this.i++;
      if (this.i > 150) {
        return this.push(null);
      }
      
      const resp = this.push({ key: this.i });
      if (!resp) {
        for (let i = 0; i < 10; i++) {
          this.i++;
          console.log(`Pushing over limit: ${this.i}`);
          this.push({ key: this.i });
        }
        return;
      }

    }
  }
}

class FastWritable extends stream.Writable {
  constructor() {
    super({ objectMode: true });
  }

  _write(
    chunk: any,
    encoding: string,
    callback: (error?: Error | null) => void
  ) {
    setTimeout(async () => {
      console.log("FAST WRITE CONSUMED", chunk.key);
      callback();
    }, 20);
  }
}

class SlowWritable extends stream.Writable {
  constructor() {
    super({ objectMode: true });
  }

  _write(
    chunk: any,
    encoding: string,
    callback: (error?: Error | null) => void
  ) {
    setTimeout(async () => {
      console.log("SLOW WRITE CONSUMED", chunk.key);
      callback();
    }, 1500);
  }
}
const read = new ReadableStream();
read.pipe(new FastWritable());
read.pipe(new SlowWritable());
// everything works fine, both streams consume all data

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions