-
Notifications
You must be signed in to change notification settings - Fork 235
Closed
Labels
api: pubsubIssues related to the googleapis/nodejs-pubsub API.Issues related to the googleapis/nodejs-pubsub API.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Description
There seems to be a race condition in subscriber.close() (_waitFlush) when the check for pending requests is made while a batch is currently in-flight.
I think we should track the in-flight requests as well, and resolve the _waitFlush when in-flight requests have drained.
Environment details
- OS: macOS Catalina
- Node.js version: 13.8.0
- npm version: 6.14.3
@google-cloud/pubsubversion: 1.7.1
Steps to reproduce
- Add a topic and subscription.
- Subscribe and
acka message - Immediately after
acking (message.ack(): void),await subscription.close() - Delete the subscription.
- Depending on timing, you will see something like
Cannot ack messages: Subscription does not exist
I used this test to reproduce the issue, and also tested a potential fix (PR incoming):
import { getEnv } from '../../../env/env'
import * as pubsub from '@google-cloud/pubsub'
import { v4 } from 'uuid'
import { defer } from 'bluebird'
jest.setTimeout(1000000)
test('errors if messages are inflight while closing, then deleting subscription causing it to fail', async () => {
const client = createPubSubSdk(getEnv())
// with enough iterations we should be able to trigger it.
// NOTE: when the fix is applied, this succeeds.
for (let i = 0; i < 100; i++) {
const topicName = `bug-topic-${v4()}`
const subName = `bug-sub-${v4()}`
const [topic] = await client.createTopic(topicName)
const [sub] = await topic.createSubscription(subName, {
expirationPolicy: { ttl: { seconds: 60 } },
})
await topic.publish(Buffer.from(JSON.stringify({ hello: 'world' })))
const onHandlerEnter = defer()
const onError = defer()
sub.setOptions({ batching: { maxMilliseconds: 100 } })
sub.addListener('message', (message: pubsub.Message) => {
message.ack()
onHandlerEnter.resolve()
})
sub.addListener('error', (err) => {
console.log(err)
onError.reject(err)
})
sub.open()
await onHandlerEnter.promise
// Kicks off the flush
await new Promise((resolve) => setTimeout(resolve, 100))
// Close while requests are pending. Wait for it.
await sub.close()
// Delete the subscription. This will trigger an error in ack.
await sub.delete()
await topic.delete()
await Promise.race([
new Promise((resolve) => setTimeout(resolve, 1000)),
onError.promise,
])
}
})
function createPubSubSdk(env: any) {
return new pubsub.PubSub({
projectId: env.GCLOUD_PROJECT,
credentials: JSON.parse(env.GCLOUD_CREDENTIALS),
})
}Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the googleapis/nodejs-pubsub API.Issues related to the googleapis/nodejs-pubsub API.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.