|
20 | 20 |
|
21 | 21 | import arrify = require('arrify'); |
22 | 22 | import * as extend from 'extend'; |
23 | | -import {split} from 'split-array-stream'; |
24 | | -import {Transform, TransformOptions} from 'stream'; |
25 | | -import * as streamEvents from 'stream-events'; |
26 | | - |
27 | | -export interface CreateLimiterOptions { |
28 | | - /** |
29 | | - * The maximum number of API calls to make. |
30 | | - */ |
31 | | - maxApiCalls?: number; |
32 | | - |
33 | | - /** |
34 | | - * Options to pass to the Stream constructor. |
35 | | - */ |
36 | | - streamOptions?: TransformOptions; |
37 | | -} |
38 | | - |
39 | | -export interface Limiter { |
40 | | - // tslint:disable-next-line no-any |
41 | | - makeRequest(...args: any[]): Transform | undefined; |
42 | | - stream: Transform; |
43 | | -} |
44 | | - |
45 | | -export type ResourceStream<T> = { |
46 | | - addListener(event: 'data', listener: (data: T) => void): ResourceStream<T>; |
47 | | - emit(event: 'data', data: T): boolean; |
48 | | - on(event: 'data', listener: (data: T) => void): ResourceStream<T>; |
49 | | - once(event: 'data', listener: (data: T) => void): ResourceStream<T>; |
50 | | - prependListener( |
51 | | - event: 'data', |
52 | | - listener: (data: T) => void |
53 | | - ): ResourceStream<T>; |
54 | | - prependOnceListener( |
55 | | - event: 'data', |
56 | | - listener: (data: T) => void |
57 | | - ): ResourceStream<T>; |
58 | | - removeListener(event: 'data', listener: (data: T) => void): ResourceStream<T>; |
59 | | -} & Transform; |
60 | | - |
61 | | -/** |
62 | | - * Limit requests according to a `maxApiCalls` limit. |
63 | | - * |
64 | | - * @param {function} makeRequestFn - The function that will be called. |
65 | | - * @param {object=} options - Configuration object. |
66 | | - * @param {number} options.maxApiCalls - The maximum number of API calls to make. |
67 | | - * @param {object} options.streamOptions - Options to pass to the Stream constructor. |
68 | | - */ |
69 | | -export function createLimiter( |
70 | | - makeRequestFn: Function, |
71 | | - options?: CreateLimiterOptions |
72 | | -): Limiter { |
73 | | - options = options || {}; |
74 | | - |
75 | | - const streamOptions = options.streamOptions || {}; |
76 | | - streamOptions.objectMode = true; |
77 | | - const stream = streamEvents(new Transform(streamOptions)) as Transform; |
78 | | - |
79 | | - let requestsMade = 0; |
80 | | - let requestsToMake = -1; |
81 | | - |
82 | | - if (typeof options.maxApiCalls === 'number') { |
83 | | - requestsToMake = options.maxApiCalls!; |
84 | | - } |
85 | | - |
86 | | - return { |
87 | | - // tslint:disable-next-line:no-any |
88 | | - makeRequest(...args: any[]) { |
89 | | - requestsMade++; |
90 | | - if (requestsToMake >= 0 && requestsMade > requestsToMake) { |
91 | | - stream.push(null); |
92 | | - return; |
93 | | - } |
94 | | - makeRequestFn.apply(null, args); |
95 | | - return stream; |
96 | | - }, |
97 | | - stream, |
98 | | - }; |
99 | | -} |
| 23 | +import {TransformOptions} from 'stream'; |
| 24 | +import {ResourceStream} from './resource-stream'; |
100 | 25 |
|
101 | 26 | export interface ParsedArguments extends TransformOptions { |
102 | 27 | /** |
@@ -196,7 +121,10 @@ export class Paginator { |
196 | 121 | ): ResourceStream<T> { |
197 | 122 | const parsedArguments = paginator.parseArguments_(args); |
198 | 123 | const originalMethod = this[methodName + '_'] || this[methodName]; |
199 | | - return paginator.runAsStream_(parsedArguments, originalMethod.bind(this)); |
| 124 | + return paginator.runAsStream_<T>( |
| 125 | + parsedArguments, |
| 126 | + originalMethod.bind(this) |
| 127 | + ); |
200 | 128 | }; |
201 | 129 | } |
202 | 130 |
|
@@ -327,52 +255,16 @@ export class Paginator { |
327 | 255 | * and returns `nextQuery` to receive more results. |
328 | 256 | * @return {stream} - Readable object stream. |
329 | 257 | */ |
330 | | - runAsStream_(parsedArguments: ParsedArguments, originalMethod: Function) { |
331 | | - const query = parsedArguments.query; |
332 | | - let resultsToSend = parsedArguments.maxResults!; |
333 | | - |
334 | | - const limiter = exports.createLimiter(makeRequest, { |
335 | | - maxApiCalls: parsedArguments.maxApiCalls, |
336 | | - streamOptions: parsedArguments.streamOptions, |
337 | | - }); |
338 | | - |
339 | | - const stream = limiter.stream; |
340 | | - |
341 | | - stream.once('reading', () => { |
342 | | - limiter.makeRequest(query); |
343 | | - }); |
344 | | - |
345 | | - function makeRequest(query?: ParsedArguments | string) { |
346 | | - originalMethod(query, onResultSet); |
347 | | - } |
348 | | - |
349 | | - // tslint:disable-next-line:no-any |
350 | | - function onResultSet(err: Error | null, results?: any[], nextQuery?: any) { |
351 | | - if (err) { |
352 | | - stream.destroy(err); |
353 | | - return; |
354 | | - } |
355 | | - |
356 | | - if (resultsToSend >= 0 && results!.length > resultsToSend) { |
357 | | - results = results!.splice(0, resultsToSend); |
358 | | - } |
359 | | - |
360 | | - resultsToSend -= results!.length; |
361 | | - |
362 | | - split(results!, stream).then(streamEnded => { |
363 | | - if (streamEnded) { |
364 | | - return; |
365 | | - } |
366 | | - if (nextQuery && resultsToSend !== 0) { |
367 | | - limiter.makeRequest(nextQuery); |
368 | | - return; |
369 | | - } |
370 | | - stream.push(null); |
371 | | - }); |
372 | | - } |
373 | | - return limiter.stream; |
| 258 | + // tslint:disable-next-line:no-any |
| 259 | + runAsStream_<T = any>( |
| 260 | + parsedArguments: ParsedArguments, |
| 261 | + originalMethod: Function |
| 262 | + ): ResourceStream<T> { |
| 263 | + return new ResourceStream<T>(parsedArguments, originalMethod); |
374 | 264 | } |
375 | 265 | } |
376 | 266 |
|
377 | 267 | const paginator = new Paginator(); |
378 | 268 | export {paginator}; |
| 269 | + |
| 270 | +export {ResourceStream}; |
0 commit comments