@@ -23,6 +23,19 @@ import * as uuid from 'uuid';
2323import * as messageTypes from '../src/message-stream' ;
2424import { Subscriber } from '../src/subscriber' ;
2525
26+ const FAKE_STREAMING_PULL_TIMEOUT = 123456789 ;
27+ const FAKE_CLIENT_CONFIG = {
28+ interfaces : {
29+ 'google.pubsub.v1.Subscriber' : {
30+ methods : {
31+ StreamingPull : {
32+ timeout_millis : FAKE_STREAMING_PULL_TIMEOUT ,
33+ }
34+ }
35+ }
36+ }
37+ } ;
38+
2639// just need this for unit tests.. we have a ponyfill for destroy on
2740// MessageStream and gax streams use Duplexify
2841function destroy ( stream : Duplex , err ?: Error ) : void {
@@ -43,6 +56,10 @@ interface StreamOptions {
4356 highWaterMark ?: number ;
4457}
4558
59+ interface StreamingPullOptions {
60+ deadline : number ;
61+ }
62+
4663class FakePassThrough extends PassThrough {
4764 options : StreamOptions ;
4865 constructor ( options : StreamOptions ) {
@@ -58,9 +75,11 @@ class FakePassThrough extends PassThrough {
5875}
5976
6077class FakeGrpcStream extends Duplex {
78+ options : StreamingPullOptions ;
6179 _readableState ! : StreamState ;
62- constructor ( ) {
80+ constructor ( options : StreamingPullOptions ) {
6381 super ( { objectMode : true } ) ;
82+ this . options = options ;
6483 }
6584 cancel ( ) : void {
6685 const status = {
@@ -99,8 +118,8 @@ class FakeGaxClient {
99118class FakeGrpcClient {
100119 deadline ?: number ;
101120 streams = ( [ ] as FakeGrpcStream [ ] ) ;
102- streamingPull ( ) : FakeGrpcStream {
103- const stream = new FakeGrpcStream ( ) ;
121+ streamingPull ( options : StreamingPullOptions ) : FakeGrpcStream {
122+ const stream = new FakeGrpcStream ( options ) ;
104123 this . streams . push ( stream ) ;
105124 return stream ;
106125 }
@@ -134,13 +153,19 @@ describe('MessageStream', () => {
134153 let MessageStream : typeof messageTypes . MessageStream ;
135154 let messageStream : messageTypes . MessageStream ;
136155
156+ let now : number ;
157+
137158 before ( ( ) => {
138159 MessageStream = proxyquire ( '../src/message-stream.js' , {
139- 'stream' : { PassThrough : FakePassThrough }
160+ 'stream' : { PassThrough : FakePassThrough } ,
161+ './v1/subscriber_client_config.json' : FAKE_CLIENT_CONFIG ,
140162 } ) . MessageStream ;
141163 } ) ;
142164
143165 beforeEach ( ( ) => {
166+ now = Date . now ( ) ;
167+ sandbox . stub ( global . Date , 'now' ) . returns ( now ) ;
168+
144169 const gaxClient = new FakeGaxClient ( ) ;
145170 client = gaxClient . client ; // we hit the grpc client directly
146171 subscriber = new FakeSubscriber ( gaxClient ) as { } as Subscriber ;
@@ -191,18 +216,19 @@ describe('MessageStream', () => {
191216 assert . strictEqual ( client . streams . length , 5 ) ;
192217 } ) ;
193218
194- it ( 'should default timeout to 5 minutes' , done => {
195- const timeout = 60000 * 5 ;
196- const now = Date . now ( ) ;
197-
198- sandbox . stub ( global . Date , 'now' ) . returns ( now ) ;
199- messageStream = new MessageStream ( subscriber ) ;
219+ it ( 'should pull pullTimeouts default from config file' , ( ) => {
220+ const expectedDeadline = now + FAKE_STREAMING_PULL_TIMEOUT ;
200221
201- setImmediate ( ( ) => {
202- assert . strictEqual ( client . deadline , now + timeout ) ;
203- done ( ) ;
222+ client . streams . forEach ( stream => {
223+ const deadline = stream . options . deadline ;
224+ assert . strictEqual ( deadline , expectedDeadline ) ;
204225 } ) ;
205226 } ) ;
227+
228+ it ( 'should default timeout to 5 minutes' , ( ) => {
229+ const expectedTimeout = now + 60000 * 5 ;
230+ assert . strictEqual ( client . deadline , expectedTimeout ) ;
231+ } ) ;
206232 } ) ;
207233
208234 describe ( 'user options' , ( ) => {
@@ -238,11 +264,25 @@ describe('MessageStream', () => {
238264 } ) ;
239265 } ) ;
240266
267+ it ( 'should respect the pullTimeout option' , done => {
268+ const pullTimeout = 1234 ;
269+ const expectedDeadline = now + pullTimeout ;
270+
271+ messageStream = new MessageStream ( subscriber , { pullTimeout} ) ;
272+
273+ setImmediate ( ( ) => {
274+ client . streams . forEach ( stream => {
275+ const deadline = stream . options . deadline ;
276+ assert . strictEqual ( deadline , expectedDeadline ) ;
277+ } ) ;
278+ done ( ) ;
279+ } ) ;
280+ } ) ;
281+
241282 it ( 'should respect the timeout option' , done => {
242283 const timeout = 12345 ;
243- const now = Date . now ( ) ;
284+ const expectedDeadline = now + timeout ;
244285
245- sandbox . stub ( global . Date , 'now' ) . returns ( now ) ;
246286 messageStream = new MessageStream ( subscriber , { timeout} ) ;
247287
248288 setImmediate ( ( ) => {
0 commit comments