Skip to content

Commit 535a917

Browse files
authored
fix(deps)!: use grpc-js instead of grpc extension (#658)
1 parent a9972ea commit 535a917

File tree

13 files changed

+57
-34
lines changed

13 files changed

+57
-34
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
"extend": "^3.0.2",
6363
"google-auth-library": "^3.0.0",
6464
"google-gax": "^1.0.0",
65-
"grpc": "1.21.1",
6665
"is-stream-ended": "^0.1.4",
6766
"lodash.snakecase": "^4.1.1",
6867
"p-defer": "^3.0.0",

src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@
7777
const v1 = require('./v1');
7878
export {v1};
7979

80+
export {ServiceError} from '@grpc/grpc-js';
8081
export {CallOptions} from 'google-gax';
81-
export {ServiceError} from 'grpc';
8282
export {
8383
Policy,
8484
GetPolicyCallback,

src/message-queues.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import {CallOptions} from 'google-gax';
18-
import {Metadata, ServiceError, status} from 'grpc';
18+
import {Metadata, ServiceError, status} from '@grpc/grpc-js';
1919
import defer = require('p-defer');
2020

2121
import {Message, Subscriber} from './subscriber';
@@ -48,15 +48,17 @@ export interface BatchOptions {
4848
*/
4949
export class BatchError extends Error implements ServiceError {
5050
ackIds: string[];
51-
code?: status;
52-
metadata?: Metadata;
51+
code: status;
52+
details: string;
53+
metadata: Metadata;
5354
constructor(err: ServiceError, ackIds: string[], rpc: string) {
5455
super(
5556
`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${err.message}`
5657
);
5758

5859
this.ackIds = ackIds;
5960
this.code = err.code;
61+
this.details = err.details;
6062
this.metadata = err.metadata;
6163
}
6264
}

src/message-stream.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import {
2222
ServiceError,
2323
status,
2424
StatusObject,
25-
} from 'grpc';
25+
} from '@grpc/grpc-js';
2626
import * as isStreamEnded from 'is-stream-ended';
2727
import {PassThrough} from 'stream';
2828

@@ -69,11 +69,13 @@ type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse> & {
6969
* @param {object} status The gRPC status object.
7070
*/
7171
export class StatusError extends Error implements ServiceError {
72-
code?: status;
73-
metadata?: Metadata;
72+
code: status;
73+
details: string;
74+
metadata: Metadata;
7475
constructor(status: StatusObject) {
7576
super(status.details);
7677
this.code = status.code;
78+
this.details = status.details;
7779
this.metadata = status.metadata;
7880
}
7981
}
@@ -87,11 +89,15 @@ export class StatusError extends Error implements ServiceError {
8789
*/
8890
export class ChannelError extends Error implements ServiceError {
8991
code: status;
92+
details: string;
93+
metadata: Metadata;
9094
constructor(err: Error) {
9195
super(`Failed to connect to channel. Reason: ${err.message}`);
9296
this.code = err.message.includes('deadline')
9397
? status.DEADLINE_EXCEEDED
9498
: status.UNKNOWN;
99+
this.details = err.message;
100+
this.metadata = new Metadata();
95101
}
96102
}
97103

@@ -259,9 +265,14 @@ export class MessageStream extends PassThrough {
259265
* @private
260266
*/
261267
private _keepAlive(): void {
262-
for (const stream of this._streams.keys()) {
263-
stream.write({});
264-
}
268+
this._streams.forEach((receivedStatus, stream) => {
269+
// its possible that a status event fires off (signaling the rpc being
270+
// closed) but the stream hasn't drained yet, writing to this stream will
271+
// result in a `write after end` error
272+
if (!receivedStatus) {
273+
stream.write({});
274+
}
275+
});
265276
}
266277
/**
267278
* Once the stream has nothing left to read, we'll remove it and attempt to

src/pubsub.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import is from '@sindresorhus/is';
2121
import * as extend from 'extend';
2222
import {GoogleAuth} from 'google-auth-library';
2323
import * as gax from 'google-gax';
24-
import * as grpc from 'grpc';
24+
import * as grpc from '@grpc/grpc-js';
25+
import {ServiceError, ChannelCredentials} from '@grpc/grpc-js';
2526

2627
const PKG = require('../../package.json');
2728
const v1 = require('./v1');
@@ -46,7 +47,6 @@ import {PublishOptions} from './publisher';
4647
import {CallOptions} from 'google-gax';
4748
import {Transform} from 'stream';
4849
import {google} from '../proto/pubsub';
49-
import {ServiceError, ChannelCredentials} from 'grpc';
5050

5151
const opts = {} as gax.GrpcClientOptions;
5252

@@ -263,10 +263,6 @@ export class PubSub {
263263
}
264264
this.options = Object.assign(
265265
{
266-
grpc,
267-
'grpc.keepalive_time_ms': 300000,
268-
'grpc.max_send_message_length': -1,
269-
'grpc.max_receive_message_length': 20000001,
270266
libName: 'gccl',
271267
libVersion: PKG.version,
272268
scopes: Object.keys(allScopes),
@@ -932,7 +928,7 @@ export class PubSub {
932928
request<T, R = void>(config: RequestConfig, callback: RequestCallback<T, R>) {
933929
this.getClient_(config, (err, client) => {
934930
if (err) {
935-
callback(err);
931+
callback(err as ServiceError);
936932
return;
937933
}
938934
let reqOpts = extend(true, {}, config.reqOpts);

src/pull-retry.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
import {StatusObject, status} from 'grpc';
16+
import {StatusObject, status} from '@grpc/grpc-js';
1717

1818
/*!
1919
* retryable status codes

system-test/pubsub.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import * as assert from 'assert';
18+
import * as crypto from 'crypto';
1819
import defer = require('p-defer');
1920
import * as uuid from 'uuid';
2021

@@ -506,6 +507,24 @@ describe('pubsub', () => {
506507
}
507508
});
508509

510+
it('should send and receive large messages', done => {
511+
const subscription = topic.subscription(SUB_NAMES[0]);
512+
const buf = crypto.randomBytes(9000000); // 9mb
513+
514+
topic.publish(buf, (err, messageId) => {
515+
assert.ifError(err);
516+
517+
subscription.on('error', done).on('message', ({id, data}: Message) => {
518+
if (id !== messageId) {
519+
return;
520+
}
521+
522+
assert.deepStrictEqual(data, buf);
523+
subscription.close(done);
524+
});
525+
});
526+
});
527+
509528
// can be ran manually to test options/memory usage/etc.
510529
// tslint:disable-next-line ban
511530
it.skip('should handle a large volume of messages', async function() {

test/message-queues.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import * as assert from 'assert';
1818
import {EventEmitter} from 'events';
1919
import {CallOptions} from 'google-gax';
20-
import {Metadata, ServiceError} from 'grpc';
20+
import {Metadata, ServiceError} from '@grpc/grpc-js';
2121
import * as proxyquire from 'proxyquire';
2222
import * as sinon from 'sinon';
2323
import * as uuid from 'uuid';
@@ -322,7 +322,7 @@ describe('MessageQueues', () => {
322322

323323
const ackIds = messages.map(message => message.ackId);
324324

325-
const fakeError: ServiceError = new Error('Err.');
325+
const fakeError = new Error('Err.') as ServiceError;
326326
fakeError.code = 2;
327327
fakeError.metadata = new Metadata();
328328

@@ -446,7 +446,7 @@ describe('MessageQueues', () => {
446446

447447
const ackIds = messages.map(message => message.ackId);
448448

449-
const fakeError: ServiceError = new Error('Err.');
449+
const fakeError = new Error('Err.') as ServiceError;
450450
fakeError.code = 2;
451451
fakeError.metadata = new Metadata();
452452

test/message-stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616

1717
import * as assert from 'assert';
18-
import {Metadata, ServiceError} from 'grpc';
18+
import {Metadata, ServiceError} from '@grpc/grpc-js';
1919
import * as proxyquire from 'proxyquire';
2020
import * as sinon from 'sinon';
2121
import {Duplex, PassThrough} from 'stream';

test/pubsub.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ import * as promisify from '@google-cloud/promisify';
1919
import arrify = require('arrify');
2020
import * as assert from 'assert';
2121
import * as gax from 'google-gax';
22-
import * as grpc from 'grpc';
23-
import {CallOptions, ServiceError, ChannelCredentials} from 'grpc';
22+
import * as grpc from '@grpc/grpc-js';
23+
import {CallOptions, ChannelCredentials, ServiceError} from '@grpc/grpc-js';
2424
import * as proxyquire from 'proxyquire';
2525
import * as sinon from 'sinon';
2626

@@ -189,10 +189,6 @@ describe('PubSub', () => {
189189

190190
describe('instantiation', () => {
191191
const DEFAULT_OPTIONS = {
192-
grpc,
193-
'grpc.keepalive_time_ms': 300000,
194-
'grpc.max_send_message_length': -1,
195-
'grpc.max_receive_message_length': 20000001,
196192
libName: 'gccl',
197193
libVersion: PKG.version,
198194
scopes: [],

0 commit comments

Comments
 (0)