Skip to content

Commit 38502ad

Browse files
authored
feat(subscription): ordered messages (#560)
1 parent fe33e40 commit 38502ad

File tree

3 files changed

+46
-26
lines changed

3 files changed

+46
-26
lines changed

proto/pubsub.d.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,9 @@ export namespace google {
767767

768768
/** PubsubMessage publishTime */
769769
publishTime?: (google.protobuf.ITimestamp|null);
770+
771+
/** PubsubMessage orderingKey */
772+
orderingKey?: (string|null);
770773
}
771774

772775
/** Represents a PubsubMessage. */
@@ -790,6 +793,9 @@ export namespace google {
790793
/** PubsubMessage publishTime. */
791794
public publishTime?: (google.protobuf.ITimestamp|null);
792795

796+
/** PubsubMessage orderingKey. */
797+
public orderingKey: string;
798+
793799
/**
794800
* Creates a new PubsubMessage instance using the specified properties.
795801
* @param [properties] Properties to set
@@ -1941,6 +1947,9 @@ export namespace google {
19411947
/** Subscription labels */
19421948
labels?: ({ [k: string]: string }|null);
19431949

1950+
/** Subscription enableMessageOrdering */
1951+
enableMessageOrdering?: (boolean|null);
1952+
19441953
/** Subscription expirationPolicy */
19451954
expirationPolicy?: (google.pubsub.v1.IExpirationPolicy|null);
19461955
}
@@ -1975,6 +1984,9 @@ export namespace google {
19751984
/** Subscription labels. */
19761985
public labels: { [k: string]: string };
19771986

1987+
/** Subscription enableMessageOrdering. */
1988+
public enableMessageOrdering: boolean;
1989+
19781990
/** Subscription expirationPolicy. */
19791991
public expirationPolicy?: (google.pubsub.v1.IExpirationPolicy|null);
19801992

src/subscriber.ts

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,15 @@ import {EventEmitter} from 'events';
2121
import {ClientStub} from 'google-gax';
2222
import {common as protobuf} from 'protobufjs';
2323

24+
import {google} from '../proto/pubsub';
25+
2426
import {Histogram} from './histogram';
2527
import {FlowControlOptions, LeaseManager} from './lease-manager';
2628
import {AckQueue, BatchOptions, ModAckQueue} from './message-queues';
2729
import {MessageStream, MessageStreamOptions} from './message-stream';
2830
import {Subscription} from './subscription';
2931

30-
/**
31-
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#ReceivedMessage
32-
*/
33-
interface ReceivedMessage {
34-
ackId: string;
35-
message: {
36-
attributes: {},
37-
data: Buffer,
38-
messageId: string,
39-
publishTime: protobuf.ITimestamp
40-
};
41-
}
42-
43-
/**
44-
* @see https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull#body.PullResponse
45-
*/
46-
export interface PullResponse {
47-
receivedMessages: ReceivedMessage[];
48-
}
32+
export type PullResponse = google.pubsub.v1.IPullResponse;
4933

5034
/**
5135
* Date object with nanosecond precision. Supports all standard Date arguments
@@ -65,6 +49,7 @@ export interface PullResponse {
6549
* // attributes: {key: 'value'},
6650
* // data: Buffer.from('Hello, world!),
6751
* // id: '1551297743043',
52+
* // orderingKey: 'ordering-key',
6853
* // publishTime: new PreciseDate('2019-02-27T20:02:19.029534186Z'),
6954
* // received: 1551297743043,
7055
* // length: 13
@@ -76,6 +61,7 @@ export class Message {
7661
attributes: {};
7762
data: Buffer;
7863
id: string;
64+
orderingKey?: string;
7965
publishTime: PreciseDate;
8066
received: number;
8167
private _handled: boolean;
@@ -87,43 +73,59 @@ export class Message {
8773
* @param {Subscriber} sub The parent subscriber.
8874
* @param {object} message The raw message response.
8975
*/
90-
constructor(sub: Subscriber, {ackId, message}: ReceivedMessage) {
76+
constructor(sub: Subscriber, {ackId,
77+
message}: google.pubsub.v1.IReceivedMessage) {
9178
/**
9279
* This ID is used to acknowledge the message.
9380
*
9481
* @name Message#ackId
9582
* @type {string}
9683
*/
97-
this.ackId = ackId;
84+
this.ackId = ackId!;
9885
/**
9986
* Optional attributes for this message.
10087
*
10188
* @name Message#attributes
10289
* @type {object}
10390
*/
104-
this.attributes = message.attributes || {};
91+
this.attributes = message!.attributes || {};
10592
/**
10693
* The message data as a Buffer.
10794
*
10895
* @name Message#data
10996
* @type {Buffer}
11097
*/
111-
this.data = message.data;
98+
this.data = message!.data as Buffer;
11299
/**
113100
* ID of the message, assigned by the server when the message is published.
114101
* Guaranteed to be unique within the topic.
115102
*
116103
* @name Message#id
117104
* @type {string}
118105
*/
119-
this.id = message.messageId;
106+
this.id = message!.messageId!;
107+
/**
108+
* Identifies related messages for which publish order should be respected.
109+
* If a `Subscription` has `enableMessageOrdering` set to `true`, messages
110+
* published with the same `orderingKey` value will be delivered to
111+
* subscribers in the order in which they are received by the Pub/Sub
112+
* system.
113+
*
114+
* **EXPERIMENTAL:** This feature is part of a closed alpha release. This
115+
* API might be changed in backward-incompatible ways and is not recommended
116+
* for production use. It is not subject to any SLA or deprecation policy.
117+
*
118+
* @name Message#orderingKey
119+
* @type {string}
120+
*/
121+
this.orderingKey = message!.orderingKey!;
120122
/**
121123
* The time at which the message was published.
122124
*
123125
* @name Message#publishTime
124126
* @type {external:PreciseDate}
125127
*/
126-
this.publishTime = new PreciseDate(message.publishTime as DateStruct);
128+
this.publishTime = new PreciseDate(message!.publishTime as DateStruct);
127129
/**
128130
* The time at which the message was recieved by the subscription.
129131
*
@@ -424,7 +426,7 @@ export class Subscriber extends EventEmitter {
424426
* @private
425427
*/
426428
private _onData({receivedMessages}: PullResponse): void {
427-
for (const data of receivedMessages) {
429+
for (const data of receivedMessages!) {
428430
const message = new Message(this, data);
429431

430432
if (this.isOpen) {

test/subscriber.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ const RECEIVED_MESSAGE = {
130130
attributes: {},
131131
data: Buffer.from('Hello, world!'),
132132
messageId: uuid.v4(),
133+
orderingKey: 'ordering-key',
133134
publishTime: {seconds: 12, nanos: 32}
134135
}
135136
};
@@ -608,6 +609,11 @@ describe('Subscriber', () => {
608609
assert.strictEqual(message.id, RECEIVED_MESSAGE.message.messageId);
609610
});
610611

612+
it('should localize orderingKey', () => {
613+
assert.strictEqual(
614+
message.orderingKey, RECEIVED_MESSAGE.message.orderingKey);
615+
});
616+
611617
it('should localize publishTime', () => {
612618
const m = new Message(subscriber, RECEIVED_MESSAGE);
613619
const timestamp = m.publishTime as unknown as FakePreciseDate;

0 commit comments

Comments
 (0)