Skip to content

use pipeline() instead of pipe()#1889

Closed
rosald wants to merge 1 commit intokoajs:masterfrom
rosald:feat/pipeline
Closed

use pipeline() instead of pipe()#1889
rosald wants to merge 1 commit intokoajs:masterfrom
rosald:feat/pipeline

Conversation

@rosald
Copy link
Contributor

@rosald rosald commented Jun 20, 2025

Checklist

  • I have ensured my pull request is not behind the main or master branch of the original repository.
  • I have rebased all commits where necessary so that reviewing this pull request can be done without having to merge it first.
  • I have written a commit message that passes commitlint linting.
  • I have ensured that my code changes pass linting tests.
  • I have ensured that my code changes pass unit tests.
  • I have described my pull request and the reasons for code changes along with context if necessary.

pipe(rs, res) will not destroy res, when an error occurs. Thus causing client "hang" infinite when it has received some part of data.

Following code example, a readable stream has been implemented. When client request, the server will send some data, but after the error occurs, the client hang, the server doesn't close the socket. Chrome will hang. curl and ab will end in timeout.

After replacing with pipeline, when error occur, koa immediately close the socket, Chrome will say "Check internet connection" in download page. curl and ab will finish request as soon as possible.

stream.pipeline() will call stream.destroy(err) on all streams except they have already been closed.

the behavior may be better than hanging the request. Instead of client waitting indefinitely for the remaining bytes, closing the connection will break the expectation, but it's the only way to signal that the response is incomplete. The client will know that the response is truncated. Headers are already sent, an error occurs mid-stream, the client could handle this predictably.

Here's another to consider:

It's koa duty to close connection, it's app writer's duty to close the readable stream, and it's (better) stream implementers duty to release corresponding resources(request socket, fd, etc) of the readable stream when error occurs(but if it didn't work well, app writer should handle it, not koa).

Last benifit is pipeline() added support for webstreams(v18.16.0), making it easier.

import { Readable } from 'node:stream';
import http from 'http';
import Koa from 'koa';

class MyStream extends Readable {
  #count = 0;
  _read(size) {
    this.#count++;
    this.push(':-)');
    if (this.#count === 4) {
      try {
        // https://nodejs.org/api/stream.html#errors-while-reading
        throw new Error('Error in stream');
      } catch (e) {
        this.destroy(e);
        return;
      }
    }
    if (this.#count === 5) {
      this.push(null);
    }
  }
}

const httpPort = 8081;
const app = new Koa();

app.use((ctx) => {
  if (ctx.request.path === '/favicon.ico') {
    ctx.status = 404;
    return;
  }

  const stream = new MyStream();

  ctx.body = stream;
});

http.createServer(app.callback()).listen(httpPort);

@jonathanong
Copy link
Member

jonathanong commented Jun 28, 2025

@rosald thanks for taking a stab! This is what I was thinking about for #1864

However, the PR as-is wouldn't suffice as we are hiding errors. Most likely, the best approach is to have a new API, something like ctx.body = stream and ctx.transforms = [], which we then pipeline into Stream.pipeline(ctx.body, ...ctx.transforms, ctx.res) with proper error handling. Then, middleware like compress can just be a transform function added to ctx.transforms

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR replaces Node.js pipe() with pipeline() for stream handling to improve error handling and prevent client connections from hanging when stream errors occur.

  • Replaces pipe() with pipeline() for all stream types (Blob, ReadableStream, Response, and general streams)
  • Removes automatic error listeners from streams, delegating error handling to user code
  • Updates tests to reflect the new behavior where error handling is the user's responsibility

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.

File Description
lib/response.js Fixes import name and removes automatic error listener for streams
lib/application.js Replaces pipe() with pipeline() for all stream response types
tests/response/body.test.js Updates test to verify no automatic error listeners are added
tests/application/respond.test.js Updates test to add manual error handling for streams

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +307 to +310
if (body instanceof Blob) { return Stream.pipeline(body.stream(), res, (err) => {}) }
if (body instanceof ReadableStream) { return Stream.pipeline(body, res, (err) => {}) }
if (body instanceof Response) { return Stream.pipeline(body?.body || '', res, (err) => {}) }
if (isStream(body)) { return Stream.pipeline(body, res, (err) => {}) }
Copy link

Copilot AI Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty error callback functions silently ignore errors. Consider logging errors or at minimum adding a comment explaining why errors are intentionally ignored.

Suggested change
if (body instanceof Blob) { return Stream.pipeline(body.stream(), res, (err) => {}) }
if (body instanceof ReadableStream) { return Stream.pipeline(body, res, (err) => {}) }
if (body instanceof Response) { return Stream.pipeline(body?.body || '', res, (err) => {}) }
if (isStream(body)) { return Stream.pipeline(body, res, (err) => {}) }
if (body instanceof Blob) { return Stream.pipeline(body.stream(), res, (err) => { if (err) debug('Stream.pipeline error:', err) }) }
if (body instanceof ReadableStream) { return Stream.pipeline(body, res, (err) => { if (err) debug('Stream.pipeline error:', err) }) }
if (body instanceof Response) { return Stream.pipeline(body?.body || '', res, (err) => { if (err) debug('Stream.pipeline error:', err) }) }
if (isStream(body)) { return Stream.pipeline(body, res, (err) => { if (err) debug('Stream.pipeline error:', err) }) }

Copilot uses AI. Check for mistakes.
if (isStream(body)) return body.pipe(res)
if (body instanceof Blob) { return Stream.pipeline(body.stream(), res, (err) => {}) }
if (body instanceof ReadableStream) { return Stream.pipeline(body, res, (err) => {}) }
if (body instanceof Response) { return Stream.pipeline(body?.body || '', res, (err) => {}) }
Copy link

Copilot AI Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using an empty string as fallback for body?.body may cause issues when passed to Stream.pipeline, which expects a stream-like object. Consider using a proper stream or handling the null case differently.

Suggested change
if (body instanceof Response) { return Stream.pipeline(body?.body || '', res, (err) => {}) }
if (body instanceof Response) {
if (body.body) {
return Stream.pipeline(body.body, res, (err) => {})
} else {
return res.end()
}
}

Copilot uses AI. Check for mistakes.
@fengmk2
Copy link
Member

fengmk2 commented Oct 22, 2025

Duplicate to #1893

pull bot pushed a commit to fishman110/koa that referenced this pull request Oct 22, 2025
## Description

- closes koajs#1834 
- closes koajs#1882 
- closes koajs#1889

## Checklist

- [x] I have ensured my pull request is not behind the main or master
branch of the original repository.
- [x] I have rebased all commits where necessary so that reviewing this
pull request can be done without having to merge it first.
- [x] I have written a commit message that passes commitlint linting.
- [x] I have ensured that my code changes pass linting tests.
- [x] I have ensured that my code changes pass unit tests.
- [x] I have described my pull request and the reasons for code changes
along with context if necessary.

## Checklist for 1834 (before re-pinging team)

- [x] Update stackblitz - [stackblitz demostrating error and
fix](https://stackblitz.com/edit/koa-patch-starter-pwupm4kr?file=README.md,index.js,patch%2Fkoa%2Flib%2Fapplication.js,vendor%2Fkoa%2Flib%2Fapplication.js)
- [x] Address comments
- [x] Re-review after merging 2 issues together
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants