From e5e38c67e922b6b72c7c8762c93f513c36bdb0be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 11:14:50 +0100 Subject: [PATCH 01/11] Test clone timeout --- test/server.js | 12 ++++++++++++ test/test.js | 13 +++++++++++++ 2 files changed, 25 insertions(+) diff --git a/test/server.js b/test/server.js index 4028f0cc4..3fce0e1b5 100644 --- a/test/server.js +++ b/test/server.js @@ -2,6 +2,7 @@ import * as http from 'http'; import { parse } from 'url'; import * as zlib from 'zlib'; import * as stream from 'stream'; +import crypto from 'crypto'; import { multipart as Multipart } from 'parted'; let convert; @@ -154,6 +155,17 @@ export default class TestServer { res.end('cookie'); } + if (p === '/too-big-second-chunk') { + // Observed behavior of TCP packets splitting: + // - response body size <= 65438 → single packet sent + // - response body size > 65438 → multiple packets sent + // Max TCP packet size is 64kB (https://stackoverflow.com/a/2614188/5763764), + // but first packet probably transfers more than the response body. + const firstPacketMaxSize = 65438 + const secondPacketSize = 16 * 1024 // = defaultHighWaterMark + res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); + } + if (p === '/size/chunk') { res.statusCode = 200; res.setHeader('Content-Type', 'text/plain'); diff --git a/test/test.js b/test/test.js index 78301a4b7..794a0888d 100644 --- a/test/test.js +++ b/test/test.js @@ -1698,6 +1698,19 @@ describe('node-fetch', () => { ); }); + it('should timeout on cloning response without consuming one of the streams when the second packet is equal highWaterMark', function () { + const url = `${base}too-big-second-chunk`; + return new Promise((resolve, reject) => { + const timer = setTimeout(() => resolve(), 200) + fetch(url) + .then(res => res.clone().buffer()) + .then(chunk => { + clearTimeout(timer) + reject(new Error('Response should not have been resolved.')) + }); + }) + }); + it('should allow get all responses of a header', function() { const url = `${base}cookie`; return fetch(url).then(res => { From 4705019586fa0ced77bf37555f810efc6da99597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 13:18:06 +0100 Subject: [PATCH 02/11] Mock response instead test-specific URL --- test/server.js | 25 +++++++++++++------------ test/test.js | 17 ++++++++++++++--- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/test/server.js b/test/server.js index 3fce0e1b5..82b8c6a4e 100644 --- a/test/server.js +++ b/test/server.js @@ -2,7 +2,6 @@ import * as http from 'http'; import { parse } from 'url'; import * as zlib from 'zlib'; import * as stream from 'stream'; -import crypto from 'crypto'; import { multipart as Multipart } from 'parted'; let convert; @@ -32,9 +31,22 @@ export default class TestServer { this.server.close(cb); } + set mockResponse(responseHandler) { + this.server.nextResponseHandler = responseHandler; + } + router(req, res) { let p = parse(req.url).pathname; + if (p === '/mocked') { + if (this.nextResponseHandler) { + this.nextResponseHandler(res); + this.nextResponseHandler = undefined; + } else { + throw new Error('No mocked response. Set response handler to \'TestServer.mockResponse\'.'); + } + } + if (p === '/hello') { res.statusCode = 200; res.setHeader('Content-Type', 'text/plain'); @@ -155,17 +167,6 @@ export default class TestServer { res.end('cookie'); } - if (p === '/too-big-second-chunk') { - // Observed behavior of TCP packets splitting: - // - response body size <= 65438 → single packet sent - // - response body size > 65438 → multiple packets sent - // Max TCP packet size is 64kB (https://stackoverflow.com/a/2614188/5763764), - // but first packet probably transfers more than the response body. - const firstPacketMaxSize = 65438 - const secondPacketSize = 16 * 1024 // = defaultHighWaterMark - res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); - } - if (p === '/size/chunk') { res.statusCode = 200; res.setHeader('Content-Type', 'text/plain'); diff --git a/test/test.js b/test/test.js index 794a0888d..06ee8297b 100644 --- a/test/test.js +++ b/test/test.js @@ -12,6 +12,7 @@ import URLSearchParams_Polyfill from 'url-search-params'; import { URL } from 'whatwg-url'; import { AbortController } from 'abortcontroller-polyfill/dist/abortcontroller'; import AbortController2 from 'abort-controller'; +import crypto from 'crypto'; const { spawn } = require('child_process'); const http = require('http'); @@ -1698,11 +1699,21 @@ describe('node-fetch', () => { ); }); - it('should timeout on cloning response without consuming one of the streams when the second packet is equal highWaterMark', function () { - const url = `${base}too-big-second-chunk`; + it('should timeout on cloning response without consuming one of the streams when the second packet size is equal highWaterMark', function () { + this.timeout(500) + local.mockResponse = res => { + // Observed behavior of TCP packets splitting: + // - response body size <= 65438 → single packet sent + // - response body size > 65438 → multiple packets sent + // Max TCP packet size is 64kB (https://stackoverflow.com/a/2614188/5763764), + // but first packet probably transfers more than the response body. + const firstPacketMaxSize = 65438 + const secondPacketSize = 16 * 1024 // = defaultHighWaterMark + res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); + } return new Promise((resolve, reject) => { const timer = setTimeout(() => resolve(), 200) - fetch(url) + fetch(`${base}mocked`) .then(res => res.clone().buffer()) .then(chunk => { clearTimeout(timer) From e2dc5db9417ef860e7a542e4faffc081aec24961 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 13:26:32 +0100 Subject: [PATCH 03/11] Test resolved clone --- test/test.js | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/test.js b/test/test.js index 06ee8297b..4a8aa1515 100644 --- a/test/test.js +++ b/test/test.js @@ -1722,6 +1722,24 @@ describe('node-fetch', () => { }) }); + it('should resolve on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { + this.timeout(500) + local.mockResponse = res => { + const firstPacketMaxSize = 65438 + const secondPacketSize = 16 * 1024 // = defaultHighWaterMark + res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize - 1)); + } + return new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error('Response should have been resolved.')), 200) + fetch(`${base}mocked`) + .then(res => res.clone().buffer()) + .then(chunk => { + clearTimeout(timer) + resolve() + }); + }) + }); + it('should allow get all responses of a header', function() { const url = `${base}cookie`; return fetch(url).then(res => { From 0a2b1bdc24eda043d55ebaecd747fa279cb37f5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 15:38:03 +0100 Subject: [PATCH 04/11] Create chai helper for testing promise timeout --- test/test.js | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/test/test.js b/test/test.js index 4a8aa1515..7ef741b81 100644 --- a/test/test.js +++ b/test/test.js @@ -34,6 +34,23 @@ try { convert = require('encoding').convert; } catch(e) { } chai.use(chaiPromised); chai.use(chaiIterator); chai.use(chaiString); +chai.use((_, utils) => { + utils.addProperty(chai.Assertion.prototype, 'timeout', function () { + return new Promise(resolve => { + const timer = setTimeout(() => resolve(true), 200); + this._obj.then(() => { + clearTimeout(timer); + resolve(false); + }); + }).then(timeouted => { + this.assert( + timeouted, + 'expected promise to timeout but it was resolved', + 'expected promise not to timeout but it timed out' + ); + }) + }); +}); const expect = chai.expect; import TestServer from './server'; @@ -1700,44 +1717,34 @@ describe('node-fetch', () => { }); it('should timeout on cloning response without consuming one of the streams when the second packet size is equal highWaterMark', function () { - this.timeout(500) + this.timeout(500); local.mockResponse = res => { // Observed behavior of TCP packets splitting: // - response body size <= 65438 → single packet sent // - response body size > 65438 → multiple packets sent // Max TCP packet size is 64kB (https://stackoverflow.com/a/2614188/5763764), // but first packet probably transfers more than the response body. - const firstPacketMaxSize = 65438 - const secondPacketSize = 16 * 1024 // = defaultHighWaterMark + const firstPacketMaxSize = 65438; + const secondPacketSize = 16 * 1024; // = defaultHighWaterMark res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); } - return new Promise((resolve, reject) => { - const timer = setTimeout(() => resolve(), 200) + return expect( fetch(`${base}mocked`) .then(res => res.clone().buffer()) - .then(chunk => { - clearTimeout(timer) - reject(new Error('Response should not have been resolved.')) - }); - }) + ).to.timeout; }); - it('should resolve on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { - this.timeout(500) + it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { + this.timeout(500); local.mockResponse = res => { - const firstPacketMaxSize = 65438 - const secondPacketSize = 16 * 1024 // = defaultHighWaterMark + const firstPacketMaxSize = 65438; + const secondPacketSize = 16 * 1024; // = defaultHighWaterMark res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize - 1)); } - return new Promise((resolve, reject) => { - const timer = setTimeout(() => reject(new Error('Response should have been resolved.')), 200) + return expect( fetch(`${base}mocked`) .then(res => res.clone().buffer()) - .then(chunk => { - clearTimeout(timer) - resolve() - }); - }) + ).not.to.timeout; }); it('should allow get all responses of a header', function() { From a5a79333c6f1722be9953594a416849481555802 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 15:40:31 +0100 Subject: [PATCH 05/11] Move chai timeout helper to separate file --- test/chai-timeout.js | 17 +++++++++++++++++ test/test.js | 20 +++----------------- 2 files changed, 20 insertions(+), 17 deletions(-) create mode 100644 test/chai-timeout.js diff --git a/test/chai-timeout.js b/test/chai-timeout.js new file mode 100644 index 000000000..558ef314a --- /dev/null +++ b/test/chai-timeout.js @@ -0,0 +1,17 @@ +module.exports = (chai, utils) => { + utils.addProperty(chai.Assertion.prototype, 'timeout', function () { + return new Promise(resolve => { + const timer = setTimeout(() => resolve(true), 200); + this._obj.then(() => { + clearTimeout(timer); + resolve(false); + }); + }).then(timeouted => { + this.assert( + timeouted, + 'expected promise to timeout but it was resolved', + 'expected promise not to timeout but it timed out' + ); + }) + }); +}; diff --git a/test/test.js b/test/test.js index 7ef741b81..8977da992 100644 --- a/test/test.js +++ b/test/test.js @@ -31,26 +31,12 @@ const { let convert; try { convert = require('encoding').convert; } catch(e) { } +import chaiTimeout from './chai-timeout'; + chai.use(chaiPromised); chai.use(chaiIterator); chai.use(chaiString); -chai.use((_, utils) => { - utils.addProperty(chai.Assertion.prototype, 'timeout', function () { - return new Promise(resolve => { - const timer = setTimeout(() => resolve(true), 200); - this._obj.then(() => { - clearTimeout(timer); - resolve(false); - }); - }).then(timeouted => { - this.assert( - timeouted, - 'expected promise to timeout but it was resolved', - 'expected promise not to timeout but it timed out' - ); - }) - }); -}); +chai.use(chaiTimeout); const expect = chai.expect; import TestServer from './server'; From cbfacaea79bef1c355ff24b24fdbb451d7384094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 15:42:06 +0100 Subject: [PATCH 06/11] Lower timeout value to speed up test --- test/chai-timeout.js | 2 +- test/test.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/chai-timeout.js b/test/chai-timeout.js index 558ef314a..29a4eca7c 100644 --- a/test/chai-timeout.js +++ b/test/chai-timeout.js @@ -1,7 +1,7 @@ module.exports = (chai, utils) => { utils.addProperty(chai.Assertion.prototype, 'timeout', function () { return new Promise(resolve => { - const timer = setTimeout(() => resolve(true), 200); + const timer = setTimeout(() => resolve(true), 50); this._obj.then(() => { clearTimeout(timer); resolve(false); diff --git a/test/test.js b/test/test.js index 8977da992..7fb10e33a 100644 --- a/test/test.js +++ b/test/test.js @@ -1703,7 +1703,7 @@ describe('node-fetch', () => { }); it('should timeout on cloning response without consuming one of the streams when the second packet size is equal highWaterMark', function () { - this.timeout(500); + this.timeout(200); local.mockResponse = res => { // Observed behavior of TCP packets splitting: // - response body size <= 65438 → single packet sent @@ -1721,7 +1721,7 @@ describe('node-fetch', () => { }); it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { - this.timeout(500); + this.timeout(200); local.mockResponse = res => { const firstPacketMaxSize = 65438; const secondPacketSize = 16 * 1024; // = defaultHighWaterMark From e03982bc98e13a800c6d9e482082f1eaa006183b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 15:50:26 +0100 Subject: [PATCH 07/11] Return mock response URL from mockResponse --- test/server.js | 5 +++-- test/test.js | 14 ++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/test/server.js b/test/server.js index 82b8c6a4e..7a0efcc5c 100644 --- a/test/server.js +++ b/test/server.js @@ -31,8 +31,9 @@ export default class TestServer { this.server.close(cb); } - set mockResponse(responseHandler) { + mockResponse(responseHandler) { this.server.nextResponseHandler = responseHandler; + return `http://${this.hostname}:${this.port}/mocked` } router(req, res) { @@ -43,7 +44,7 @@ export default class TestServer { this.nextResponseHandler(res); this.nextResponseHandler = undefined; } else { - throw new Error('No mocked response. Set response handler to \'TestServer.mockResponse\'.'); + throw new Error('No mocked response. Use \'TestServer.mockResponse()\'.'); } } diff --git a/test/test.js b/test/test.js index 7fb10e33a..c48dc78b9 100644 --- a/test/test.js +++ b/test/test.js @@ -1704,7 +1704,7 @@ describe('node-fetch', () => { it('should timeout on cloning response without consuming one of the streams when the second packet size is equal highWaterMark', function () { this.timeout(200); - local.mockResponse = res => { + const url = local.mockResponse(res => { // Observed behavior of TCP packets splitting: // - response body size <= 65438 → single packet sent // - response body size > 65438 → multiple packets sent @@ -1713,23 +1713,21 @@ describe('node-fetch', () => { const firstPacketMaxSize = 65438; const secondPacketSize = 16 * 1024; // = defaultHighWaterMark res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); - } + }); return expect( - fetch(`${base}mocked`) - .then(res => res.clone().buffer()) + fetch(url).then(res => res.clone().buffer()) ).to.timeout; }); it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { this.timeout(200); - local.mockResponse = res => { + const url = local.mockResponse(res => { const firstPacketMaxSize = 65438; const secondPacketSize = 16 * 1024; // = defaultHighWaterMark res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize - 1)); - } + }); return expect( - fetch(`${base}mocked`) - .then(res => res.clone().buffer()) + fetch(url).then(res => res.clone().buffer()) ).not.to.timeout; }); From 4f10a63b5a9f582a3e7e2ffc234c47ddff5ff1d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 17:49:02 +0100 Subject: [PATCH 08/11] Add highWaterMark param to clone method --- src/body.js | 9 +++++---- src/response.js | 5 +++-- test/test.js | 38 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/body.js b/src/body.js index 90cbcabfa..17742d132 100644 --- a/src/body.js +++ b/src/body.js @@ -358,10 +358,11 @@ function isURLSearchParams(obj) { /** * Clone body given Res/Req instance * - * @param Mixed instance Response or Request instance + * @param Mixed instance Response or Request instance + * @param String highWaterMark highWaterMark for both PassThrough body streams * @return Mixed */ -export function clone(instance) { +export function clone(instance, highWaterMark) { let p1, p2; let body = instance.body; @@ -374,8 +375,8 @@ export function clone(instance) { // note: we can't clone the form-data object without having it as a dependency if ((body instanceof Stream) && (typeof body.getBoundary !== 'function')) { // tee instance body - p1 = new PassThrough(); - p2 = new PassThrough(); + p1 = new PassThrough({ highWaterMark }); + p2 = new PassThrough({ highWaterMark }); body.pipe(p1); body.pipe(p2); // set instance body to teed body and return the other teed body diff --git a/src/response.js b/src/response.js index f29bfe296..03789908f 100644 --- a/src/response.js +++ b/src/response.js @@ -70,10 +70,11 @@ export default class Response { /** * Clone this response * + * @param String highWaterMark highWaterMark for both PassThrough body streams * @return Response */ - clone() { - return new Response(clone(this), { + clone(highWaterMark) { + return new Response(clone(this, highWaterMark), { url: this.url, status: this.status, statusText: this.statusText, diff --git a/test/test.js b/test/test.js index c48dc78b9..b64140d98 100644 --- a/test/test.js +++ b/test/test.js @@ -1702,7 +1702,7 @@ describe('node-fetch', () => { ); }); - it('should timeout on cloning response without consuming one of the streams when the second packet size is equal highWaterMark', function () { + it('should timeout on cloning response without consuming one of the streams when the second packet size is equal default highWaterMark', function () { this.timeout(200); const url = local.mockResponse(res => { // Observed behavior of TCP packets splitting: @@ -1719,7 +1719,19 @@ describe('node-fetch', () => { ).to.timeout; }); - it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than highWaterMark', function () { + it('should timeout on cloning response without consuming one of the streams when the second packet size is equal custom highWaterMark', function () { + this.timeout(200); + const url = local.mockResponse(res => { + const firstPacketMaxSize = 65438; + const secondPacketSize = 10; + res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize)); + }); + return expect( + fetch(url).then(res => res.clone(10).buffer()) + ).to.timeout; + }); + + it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than default highWaterMark', function () { this.timeout(200); const url = local.mockResponse(res => { const firstPacketMaxSize = 65438; @@ -1731,6 +1743,28 @@ describe('node-fetch', () => { ).not.to.timeout; }); + it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than custom highWaterMark', function () { + this.timeout(200); + const url = local.mockResponse(res => { + const firstPacketMaxSize = 65438; + const secondPacketSize = 10; + res.end(crypto.randomBytes(firstPacketMaxSize + secondPacketSize - 1)); + }); + return expect( + fetch(url).then(res => res.clone(10).buffer()) + ).not.to.timeout; + }); + + it('should not timeout on cloning response without consuming one of the streams when the response size is smaller than custom large highWaterMark', function () { + this.timeout(200); + const url = local.mockResponse(res => { + res.end(crypto.randomBytes(1024 * 1024 - 1)); + }); + return expect( + fetch(url).then(res => res.clone(1024 * 1024).buffer()) + ).not.to.timeout; + }); + it('should allow get all responses of a header', function() { const url = `${base}cookie`; return fetch(url).then(res => { From c505c9a40eeecc65eeb553799b1dd0cb228322ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sat, 5 Jan 2019 18:04:59 +0100 Subject: [PATCH 09/11] Update information about clone parameter --- LIMITS.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/LIMITS.md b/LIMITS.md index 9c4b8c0c8..05e5bfd9f 100644 --- a/LIMITS.md +++ b/LIMITS.md @@ -12,7 +12,7 @@ Known differences - `res.url` contains the final url when following redirects. -- For convenience, `res.body` is a Node.js [Readable stream][readable-stream], so decoding can be handled independently. +- For convenience, `res.body` is a Node.js [Readable stream][], so decoding can be handled independently. - Similarly, `req.body` can either be `null`, a string, a buffer or a Readable stream. @@ -24,9 +24,11 @@ Known differences - Current implementation lacks server-side cookie store, you will need to extract `Set-Cookie` headers manually. -- If you are using `res.clone()` and writing an isomorphic app, note that stream on Node.js have a smaller internal buffer size (16Kb, aka `highWaterMark`) from client-side browsers (>1Mb, not consistent across browsers). +- If you are using `res.clone()` and writing an isomorphic app, note that stream on Node.js has a smaller default internal buffer size (16kB, aka [`highWaterMark`][]) from client-side browsers (>1MB, not consistent across browsers). You can override the default value by passing a custom `highWaterMark` value to `clone` method. This parameter is taken into account only by `node-fetch`. -- Because node.js stream doesn't expose a [*disturbed*](https://fetch.spec.whatwg.org/#concept-readablestream-disturbed) property like Stream spec, using a consumed stream for `new Response(body)` will not set `bodyUsed` flag correctly. +- Because node.js stream doesn't expose a [*disturbed*][] property like Stream spec, using a consumed stream for `new Response(body)` will not set `bodyUsed` flag correctly. -[readable-stream]: https://nodejs.org/api/stream.html#stream_readable_streams +[Readable stream]: https://nodejs.org/api/stream.html#stream_readable_streams [ERROR-HANDLING.md]: https://github.com/bitinn/node-fetch/blob/master/ERROR-HANDLING.md +[`highWaterMark`]: https://nodejs.org/api/stream.html#stream_buffering +[*disturbed*]: https://fetch.spec.whatwg.org/#concept-readablestream-disturbed From f923834f63918657c155cf8e5b55297db3e5d621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sun, 6 Jan 2019 18:27:03 +0100 Subject: [PATCH 10/11] Add more extensive highWaterMark documentation --- CLONE-HIGHWATERMARK.md | 109 +++++++++++++++++++++++++++++++++++++++++ LIMITS.md | 3 +- 2 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 CLONE-HIGHWATERMARK.md diff --git a/CLONE-HIGHWATERMARK.md b/CLONE-HIGHWATERMARK.md new file mode 100644 index 000000000..d3153390a --- /dev/null +++ b/CLONE-HIGHWATERMARK.md @@ -0,0 +1,109 @@ +# Increasing clone highWaterMark + +## The Problem + +When using `res.clone` method it might happen that you want process either the original body or the cloned body first: + +```js +fetch(url) + .then(res => cache.put(url, res.clone())) + .then(res => res.json()) + ... +``` + +The original response waits for the cloned response to be completed. That means the whole response is buffered in a memory during the first `than` statement. With big response sizes that might lead to consuming too much of the precious server resources. + +To keep app allocated memory low, Node.js provides [`highWaterMark` limit of stream internal buffer][hwm]. It defaults to 16kB for all streams but can be overridden explicitly. + +The problem is that the code above freezes and times out for larger response sizes. Nobody consumes the original response stream leading to accumulation of data in it's buffers. When `highWaterMark` limits are hit a mechanism called [backpressure] kicks in. As a result, data will stop flowing, endlessly waiting for the original response to signal it can take more. + +1. At the beginning 6 packets are ready to be transmitted. + + ``` + Data Original + +-------------+ +-----------+ + | O O O O O O +-----+------>+ | X + +-------------+ | +-----------+ + | + | Cloned + | +-----------+ + +------>+ +----> + +-----------+ + ``` + +2. 2 chunks passed to both streams. + + ``` + Data Original + +-------------+ +-----------+ + | O O O O +-----+------>+ O O | X + +-------------+ | +-----------+ + | + | Cloned + | +-----------+ + +------>+ O O +----> + +-----------+ + ``` + +3. 5 chunks passed to both streams. The original one triggers backpressure. Source of data stops until notification that it can send more. + + ``` + Data Original + +-------------+ +-----------+ + | O +-----+------>+ O O O O O | X + +-------------+ | +-----------+ + | + | Cloned + | +-----------+ + +------>+ O O O O O +----> + +-----------+ + ``` + +4. Chunks in the cloned stream reaches their destination. But the flow stopped. The last chunk won't be transmitted. + + ``` + Data Original + +-------------+ +-----------+ + | O +-----+------>+ O O O O O | X + +-------------+ | +-----------+ + | + | Cloned + | +-----------+ + +------>+ +----> + +-----------+ + ``` + +There are few inaccuracies in diagrams above for the sake of simplification. + +[hwm]: https://nodejs.org/api/stream.html#stream_buffering +[backpressure]: https://nodejs.org/en/docs/guides/backpressuring-in-streams/ + +## The Solution + +Set bigger `highWaterMark` limit by passing a value to `clone` method: + +```js +res.clone(40 * 1024) +``` + +Use `expected_maximal_request_size / 2 + 1` as the value. + +Don't forget that the whole response still goes into memory. Calculate carefully not to deplete all your server memory with few requests. + + +### Why + +The cloned body is in fact a Node.js [*PassThrough*][passthrough] stream. *PassThrough* streams have two buffers with the same `highWaterMark`, one for [*Writable*][writable] stream on the input and on for [*Readable*][readable] stream on output. It can contain **double the value of `highWaterMark`**. + +When `highWateMark` of the *Writeable* stream is reached, the stream writing data stops. Increasing the value by **a single byte** is sufficient to avoid that. + +In fact, streams can take much more data. `highWaterMark` is [not a limit][] really. It is rather just a mark as the name suggests. Backpressure kicks in when data written [reaches **or overflows**][highwatermark-check] `highWaterMark` value. With two buffers of *PassThrough*, the **first chunk can have any size**. Well, almost any size. A TCP packet maximum size is [64kB], so this is the most common chunk size when dealing with large HTTP responses. + +But to avoid the backpressure, when chunks fill in the second buffer without any overflow by chance, we need to make sure the first buffer won't get to the `highWaterMark`. Hence the ½ + 1 limit. + +[passthrough]: https://nodejs.org/api/stream.html#stream_class_stream_passthrough +[writable]: https://nodejs.org/api/stream.html#stream_writable_streams +[readable]: https://nodejs.org/api/stream.html#stream_readable_streams +[not a limit]: https://stackoverflow.com/a/45905930/5763764 +[highwatermark-check]: https://github.com/nodejs/node/blob/master/lib/_stream_writable.js#L378 +[64kB]: https://stackoverflow.com/a/2614188/5763764 diff --git a/LIMITS.md b/LIMITS.md index 05e5bfd9f..9345f775e 100644 --- a/LIMITS.md +++ b/LIMITS.md @@ -24,11 +24,12 @@ Known differences - Current implementation lacks server-side cookie store, you will need to extract `Set-Cookie` headers manually. -- If you are using `res.clone()` and writing an isomorphic app, note that stream on Node.js has a smaller default internal buffer size (16kB, aka [`highWaterMark`][]) from client-side browsers (>1MB, not consistent across browsers). You can override the default value by passing a custom `highWaterMark` value to `clone` method. This parameter is taken into account only by `node-fetch`. +- If you are using `res.clone()` and writing an isomorphic app, note that stream on Node.js has a smaller default internal buffer size (16kB, aka [`highWaterMark`][]) from client-side browsers (>1MB, not consistent across browsers). You can override the default value by passing a custom `highWaterMark` value to `clone` method. This parameter is taken into account only by `node-fetch`. See [CLONE-HIGHWATERMARK.md][] for more details. - Because node.js stream doesn't expose a [*disturbed*][] property like Stream spec, using a consumed stream for `new Response(body)` will not set `bodyUsed` flag correctly. [Readable stream]: https://nodejs.org/api/stream.html#stream_readable_streams [ERROR-HANDLING.md]: https://github.com/bitinn/node-fetch/blob/master/ERROR-HANDLING.md [`highWaterMark`]: https://nodejs.org/api/stream.html#stream_buffering +[CLONE-HIGHWATERMARK.md]: https://github.com/bitinn/node-fetch/blob/master/CLONE-HIGHWATERMARK.md [*disturbed*]: https://fetch.spec.whatwg.org/#concept-readablestream-disturbed From a93ec33c8f72be67c7130519cd13ff58bd91ae13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Mat=C4=9Bj?= Date: Sun, 6 Jan 2019 20:52:23 +0100 Subject: [PATCH 11/11] Test described highWaterMark formula --- test/chai-timeout.js | 2 +- test/test.js | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/test/chai-timeout.js b/test/chai-timeout.js index 29a4eca7c..cd02bcd17 100644 --- a/test/chai-timeout.js +++ b/test/chai-timeout.js @@ -1,7 +1,7 @@ module.exports = (chai, utils) => { utils.addProperty(chai.Assertion.prototype, 'timeout', function () { return new Promise(resolve => { - const timer = setTimeout(() => resolve(true), 50); + const timer = setTimeout(() => resolve(true), 150); this._obj.then(() => { clearTimeout(timer); resolve(false); diff --git a/test/test.js b/test/test.js index b64140d98..96306bbdf 100644 --- a/test/test.js +++ b/test/test.js @@ -1703,7 +1703,7 @@ describe('node-fetch', () => { }); it('should timeout on cloning response without consuming one of the streams when the second packet size is equal default highWaterMark', function () { - this.timeout(200); + this.timeout(300); const url = local.mockResponse(res => { // Observed behavior of TCP packets splitting: // - response body size <= 65438 → single packet sent @@ -1720,7 +1720,7 @@ describe('node-fetch', () => { }); it('should timeout on cloning response without consuming one of the streams when the second packet size is equal custom highWaterMark', function () { - this.timeout(200); + this.timeout(300); const url = local.mockResponse(res => { const firstPacketMaxSize = 65438; const secondPacketSize = 10; @@ -1732,7 +1732,7 @@ describe('node-fetch', () => { }); it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than default highWaterMark', function () { - this.timeout(200); + this.timeout(300); const url = local.mockResponse(res => { const firstPacketMaxSize = 65438; const secondPacketSize = 16 * 1024; // = defaultHighWaterMark @@ -1744,7 +1744,7 @@ describe('node-fetch', () => { }); it('should not timeout on cloning response without consuming one of the streams when the second packet size is less than custom highWaterMark', function () { - this.timeout(200); + this.timeout(300); const url = local.mockResponse(res => { const firstPacketMaxSize = 65438; const secondPacketSize = 10; @@ -1755,13 +1755,13 @@ describe('node-fetch', () => { ).not.to.timeout; }); - it('should not timeout on cloning response without consuming one of the streams when the response size is smaller than custom large highWaterMark', function () { - this.timeout(200); + it('should not timeout on cloning response without consuming one of the streams when the response size is double the custom large highWaterMark - 1', function () { + this.timeout(300); const url = local.mockResponse(res => { - res.end(crypto.randomBytes(1024 * 1024 - 1)); + res.end(crypto.randomBytes(2 * 512 * 1024 - 1)); }); return expect( - fetch(url).then(res => res.clone(1024 * 1024).buffer()) + fetch(url).then(res => res.clone(512 * 1024).buffer()) ).not.to.timeout; });