Skip to content

Commit 468e1bf

Browse files
authored
feat(subscription): accept pull timeout option (#556)
1 parent bed7bfb commit 468e1bf

File tree

2 files changed

+69
-16
lines changed

2 files changed

+69
-16
lines changed

src/message-stream.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,20 @@ const RETRY_CODES: status[] = [
5252
15, // dataloss
5353
];
5454

55+
/*!
56+
* Deadline for the stream.
57+
*/
58+
const PULL_TIMEOUT = require('./v1/subscriber_client_config.json')
59+
.interfaces['google.pubsub.v1.Subscriber']
60+
.methods.StreamingPull.timeout_millis;
61+
5562
/*!
5663
* default stream options
5764
*/
5865
const DEFAULT_OPTIONS: MessageStreamOptions = {
5966
highWaterMark: 0,
6067
maxStreams: 5,
68+
pullTimeout: PULL_TIMEOUT,
6169
timeout: 300000,
6270
};
6371

@@ -115,11 +123,15 @@ export class ChannelError extends Error implements ServiceError {
115123
* {@link https://nodejs.org/en/docs/guides/backpressuring-in-streams/} for
116124
* more details.
117125
* @property {number} [maxStreams=5] Number of streaming connections to make.
126+
* @property {number} [pullTimeout=900000] Timeout to be applied to each
127+
* underlying stream. Essentially this just closes a `StreamingPull` request
128+
* after the specified time.
118129
* @property {number} [timeout=300000] Timeout for establishing a connection.
119130
*/
120131
export interface MessageStreamOptions {
121132
highWaterMark?: number;
122133
maxStreams?: number;
134+
pullTimeout?: number;
123135
timeout?: number;
124136
}
125137

@@ -222,13 +234,14 @@ export class MessageStream extends PassThrough {
222234
return;
223235
}
224236

237+
const deadline = Date.now() + this._options.pullTimeout!;
225238
const request: StreamingPullRequest = {
226239
subscription: this._subscriber.name,
227240
streamAckDeadlineSeconds: this._subscriber.ackDeadline,
228241
};
229242

230243
for (let i = this._streams.size; i < this._options.maxStreams!; i++) {
231-
const stream: PullStream = client.streamingPull();
244+
const stream: PullStream = client.streamingPull({deadline});
232245
this._addStream(stream);
233246
stream.write(request);
234247
}

test/message-stream.ts

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ import * as uuid from 'uuid';
2323
import * as messageTypes from '../src/message-stream';
2424
import {Subscriber} from '../src/subscriber';
2525

26+
const FAKE_STREAMING_PULL_TIMEOUT = 123456789;
27+
const FAKE_CLIENT_CONFIG = {
28+
interfaces: {
29+
'google.pubsub.v1.Subscriber': {
30+
methods: {
31+
StreamingPull: {
32+
timeout_millis: FAKE_STREAMING_PULL_TIMEOUT,
33+
}
34+
}
35+
}
36+
}
37+
};
38+
2639
// just need this for unit tests.. we have a ponyfill for destroy on
2740
// MessageStream and gax streams use Duplexify
2841
function destroy(stream: Duplex, err?: Error): void {
@@ -43,6 +56,10 @@ interface StreamOptions {
4356
highWaterMark?: number;
4457
}
4558

59+
interface StreamingPullOptions {
60+
deadline: number;
61+
}
62+
4663
class FakePassThrough extends PassThrough {
4764
options: StreamOptions;
4865
constructor(options: StreamOptions) {
@@ -58,9 +75,11 @@ class FakePassThrough extends PassThrough {
5875
}
5976

6077
class FakeGrpcStream extends Duplex {
78+
options: StreamingPullOptions;
6179
_readableState!: StreamState;
62-
constructor() {
80+
constructor(options: StreamingPullOptions) {
6381
super({objectMode: true});
82+
this.options = options;
6483
}
6584
cancel(): void {
6685
const status = {
@@ -99,8 +118,8 @@ class FakeGaxClient {
99118
class FakeGrpcClient {
100119
deadline?: number;
101120
streams = ([] as FakeGrpcStream[]);
102-
streamingPull(): FakeGrpcStream {
103-
const stream = new FakeGrpcStream();
121+
streamingPull(options: StreamingPullOptions): FakeGrpcStream {
122+
const stream = new FakeGrpcStream(options);
104123
this.streams.push(stream);
105124
return stream;
106125
}
@@ -134,13 +153,19 @@ describe('MessageStream', () => {
134153
let MessageStream: typeof messageTypes.MessageStream;
135154
let messageStream: messageTypes.MessageStream;
136155

156+
let now: number;
157+
137158
before(() => {
138159
MessageStream = proxyquire('../src/message-stream.js', {
139-
'stream': {PassThrough: FakePassThrough}
160+
'stream': {PassThrough: FakePassThrough},
161+
'./v1/subscriber_client_config.json': FAKE_CLIENT_CONFIG,
140162
}).MessageStream;
141163
});
142164

143165
beforeEach(() => {
166+
now = Date.now();
167+
sandbox.stub(global.Date, 'now').returns(now);
168+
144169
const gaxClient = new FakeGaxClient();
145170
client = gaxClient.client; // we hit the grpc client directly
146171
subscriber = new FakeSubscriber(gaxClient) as {} as Subscriber;
@@ -191,18 +216,19 @@ describe('MessageStream', () => {
191216
assert.strictEqual(client.streams.length, 5);
192217
});
193218

194-
it('should default timeout to 5 minutes', done => {
195-
const timeout = 60000 * 5;
196-
const now = Date.now();
197-
198-
sandbox.stub(global.Date, 'now').returns(now);
199-
messageStream = new MessageStream(subscriber);
219+
it('should pull pullTimeouts default from config file', () => {
220+
const expectedDeadline = now + FAKE_STREAMING_PULL_TIMEOUT;
200221

201-
setImmediate(() => {
202-
assert.strictEqual(client.deadline, now + timeout);
203-
done();
222+
client.streams.forEach(stream => {
223+
const deadline = stream.options.deadline;
224+
assert.strictEqual(deadline, expectedDeadline);
204225
});
205226
});
227+
228+
it('should default timeout to 5 minutes', () => {
229+
const expectedTimeout = now + 60000 * 5;
230+
assert.strictEqual(client.deadline, expectedTimeout);
231+
});
206232
});
207233

208234
describe('user options', () => {
@@ -238,11 +264,25 @@ describe('MessageStream', () => {
238264
});
239265
});
240266

267+
it('should respect the pullTimeout option', done => {
268+
const pullTimeout = 1234;
269+
const expectedDeadline = now + pullTimeout;
270+
271+
messageStream = new MessageStream(subscriber, {pullTimeout});
272+
273+
setImmediate(() => {
274+
client.streams.forEach(stream => {
275+
const deadline = stream.options.deadline;
276+
assert.strictEqual(deadline, expectedDeadline);
277+
});
278+
done();
279+
});
280+
});
281+
241282
it('should respect the timeout option', done => {
242283
const timeout = 12345;
243-
const now = Date.now();
284+
const expectedDeadline = now + timeout;
244285

245-
sandbox.stub(global.Date, 'now').returns(now);
246286
messageStream = new MessageStream(subscriber, {timeout});
247287

248288
setImmediate(() => {

0 commit comments

Comments
 (0)