Skip to content

Commit 641d82d

Browse files
authored
refactor!: rewrite streaming logic (#136)
1 parent 65ff625 commit 641d82d

File tree

5 files changed

+446
-507
lines changed

5 files changed

+446
-507
lines changed

package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@
5252
},
5353
"dependencies": {
5454
"arrify": "^2.0.0",
55-
"extend": "^3.0.1",
56-
"split-array-stream": "^2.0.0",
57-
"stream-events": "^1.0.4"
55+
"extend": "^3.0.1"
5856
},
5957
"engines": {
6058
"node": ">=8.10.0"

src/index.ts

Lines changed: 14 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -20,83 +20,8 @@
2020

2121
import arrify = require('arrify');
2222
import * as extend from 'extend';
23-
import {split} from 'split-array-stream';
24-
import {Transform, TransformOptions} from 'stream';
25-
import * as streamEvents from 'stream-events';
26-
27-
export interface CreateLimiterOptions {
28-
/**
29-
* The maximum number of API calls to make.
30-
*/
31-
maxApiCalls?: number;
32-
33-
/**
34-
* Options to pass to the Stream constructor.
35-
*/
36-
streamOptions?: TransformOptions;
37-
}
38-
39-
export interface Limiter {
40-
// tslint:disable-next-line no-any
41-
makeRequest(...args: any[]): Transform | undefined;
42-
stream: Transform;
43-
}
44-
45-
export type ResourceStream<T> = {
46-
addListener(event: 'data', listener: (data: T) => void): ResourceStream<T>;
47-
emit(event: 'data', data: T): boolean;
48-
on(event: 'data', listener: (data: T) => void): ResourceStream<T>;
49-
once(event: 'data', listener: (data: T) => void): ResourceStream<T>;
50-
prependListener(
51-
event: 'data',
52-
listener: (data: T) => void
53-
): ResourceStream<T>;
54-
prependOnceListener(
55-
event: 'data',
56-
listener: (data: T) => void
57-
): ResourceStream<T>;
58-
removeListener(event: 'data', listener: (data: T) => void): ResourceStream<T>;
59-
} & Transform;
60-
61-
/**
62-
* Limit requests according to a `maxApiCalls` limit.
63-
*
64-
* @param {function} makeRequestFn - The function that will be called.
65-
* @param {object=} options - Configuration object.
66-
* @param {number} options.maxApiCalls - The maximum number of API calls to make.
67-
* @param {object} options.streamOptions - Options to pass to the Stream constructor.
68-
*/
69-
export function createLimiter(
70-
makeRequestFn: Function,
71-
options?: CreateLimiterOptions
72-
): Limiter {
73-
options = options || {};
74-
75-
const streamOptions = options.streamOptions || {};
76-
streamOptions.objectMode = true;
77-
const stream = streamEvents(new Transform(streamOptions)) as Transform;
78-
79-
let requestsMade = 0;
80-
let requestsToMake = -1;
81-
82-
if (typeof options.maxApiCalls === 'number') {
83-
requestsToMake = options.maxApiCalls!;
84-
}
85-
86-
return {
87-
// tslint:disable-next-line:no-any
88-
makeRequest(...args: any[]) {
89-
requestsMade++;
90-
if (requestsToMake >= 0 && requestsMade > requestsToMake) {
91-
stream.push(null);
92-
return;
93-
}
94-
makeRequestFn.apply(null, args);
95-
return stream;
96-
},
97-
stream,
98-
};
99-
}
23+
import {TransformOptions} from 'stream';
24+
import {ResourceStream} from './resource-stream';
10025

10126
export interface ParsedArguments extends TransformOptions {
10227
/**
@@ -196,7 +121,10 @@ export class Paginator {
196121
): ResourceStream<T> {
197122
const parsedArguments = paginator.parseArguments_(args);
198123
const originalMethod = this[methodName + '_'] || this[methodName];
199-
return paginator.runAsStream_(parsedArguments, originalMethod.bind(this));
124+
return paginator.runAsStream_<T>(
125+
parsedArguments,
126+
originalMethod.bind(this)
127+
);
200128
};
201129
}
202130

@@ -327,52 +255,16 @@ export class Paginator {
327255
* and returns `nextQuery` to receive more results.
328256
* @return {stream} - Readable object stream.
329257
*/
330-
runAsStream_(parsedArguments: ParsedArguments, originalMethod: Function) {
331-
const query = parsedArguments.query;
332-
let resultsToSend = parsedArguments.maxResults!;
333-
334-
const limiter = exports.createLimiter(makeRequest, {
335-
maxApiCalls: parsedArguments.maxApiCalls,
336-
streamOptions: parsedArguments.streamOptions,
337-
});
338-
339-
const stream = limiter.stream;
340-
341-
stream.once('reading', () => {
342-
limiter.makeRequest(query);
343-
});
344-
345-
function makeRequest(query?: ParsedArguments | string) {
346-
originalMethod(query, onResultSet);
347-
}
348-
349-
// tslint:disable-next-line:no-any
350-
function onResultSet(err: Error | null, results?: any[], nextQuery?: any) {
351-
if (err) {
352-
stream.destroy(err);
353-
return;
354-
}
355-
356-
if (resultsToSend >= 0 && results!.length > resultsToSend) {
357-
results = results!.splice(0, resultsToSend);
358-
}
359-
360-
resultsToSend -= results!.length;
361-
362-
split(results!, stream).then(streamEnded => {
363-
if (streamEnded) {
364-
return;
365-
}
366-
if (nextQuery && resultsToSend !== 0) {
367-
limiter.makeRequest(nextQuery);
368-
return;
369-
}
370-
stream.push(null);
371-
});
372-
}
373-
return limiter.stream;
258+
// tslint:disable-next-line:no-any
259+
runAsStream_<T = any>(
260+
parsedArguments: ParsedArguments,
261+
originalMethod: Function
262+
): ResourceStream<T> {
263+
return new ResourceStream<T>(parsedArguments, originalMethod);
374264
}
375265
}
376266

377267
const paginator = new Paginator();
378268
export {paginator};
269+
270+
export {ResourceStream};

src/resource-stream.ts

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*!
2+
* Copyright 2019 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {Transform} from 'stream';
18+
import {ParsedArguments} from './';
19+
20+
interface ResourceEvents<T> {
21+
addListener(event: 'data', listener: (data: T) => void): this;
22+
emit(event: 'data', data: T): boolean;
23+
on(event: 'data', listener: (data: T) => void): this;
24+
once(event: 'data', listener: (data: T) => void): this;
25+
prependListener(event: 'data', listener: (data: T) => void): this;
26+
prependOnceListener(event: 'data', listener: (data: T) => void): this;
27+
removeListener(event: 'data', listener: (data: T) => void): this;
28+
}
29+
30+
export class ResourceStream<T> extends Transform implements ResourceEvents<T> {
31+
_ended: boolean;
32+
_maxApiCalls: number;
33+
_nextQuery: {} | null;
34+
_reading: boolean;
35+
_requestFn: Function;
36+
_requestsMade: number;
37+
_resultsToSend: number;
38+
constructor(args: ParsedArguments, requestFn: Function) {
39+
const options = Object.assign({objectMode: true}, args.streamOptions);
40+
super(options);
41+
42+
this._ended = false;
43+
this._maxApiCalls = args.maxApiCalls === -1 ? Infinity : args.maxApiCalls!;
44+
this._nextQuery = args.query!;
45+
this._reading = false;
46+
this._requestFn = requestFn;
47+
this._requestsMade = 0;
48+
this._resultsToSend = args.maxResults === -1 ? Infinity : args.maxResults!;
49+
}
50+
// tslint:disable-next-line:no-any
51+
end(...args: any[]) {
52+
this._ended = true;
53+
return super.end(...args);
54+
}
55+
_read() {
56+
if (this._reading) {
57+
return;
58+
}
59+
60+
this._reading = true;
61+
62+
this._requestFn(
63+
this._nextQuery,
64+
(err: Error | null, results: T[], nextQuery: {} | null) => {
65+
if (err) {
66+
this.destroy(err);
67+
return;
68+
}
69+
70+
this._nextQuery = nextQuery;
71+
72+
if (this._resultsToSend !== Infinity) {
73+
results = results.splice(0, this._resultsToSend);
74+
this._resultsToSend -= results.length;
75+
}
76+
77+
let more = true;
78+
79+
for (const result of results) {
80+
if (this._ended) {
81+
break;
82+
}
83+
more = this.push(result);
84+
}
85+
86+
const isFinished = !this._nextQuery || this._resultsToSend < 1;
87+
const madeMaxCalls = ++this._requestsMade >= this._maxApiCalls;
88+
89+
if (isFinished || madeMaxCalls) {
90+
this.end();
91+
}
92+
93+
if (more && !this._ended) {
94+
setImmediate(() => this._read());
95+
}
96+
97+
this._reading = false;
98+
}
99+
);
100+
}
101+
}

0 commit comments

Comments
 (0)