Skip to content

Race condition when flushing message queues while a batch is in-flight #951

@jeffijoe

Description

@jeffijoe

There seems to be a race condition in subscriber.close() (_waitFlush) when the check for pending requests is made while a batch is currently in-flight.

I think we should track the in-flight requests as well, and resolve the _waitFlush when in-flight requests have drained.

Environment details

  • OS: macOS Catalina
  • Node.js version: 13.8.0
  • npm version: 6.14.3
  • @google-cloud/pubsub version: 1.7.1

Steps to reproduce

  1. Add a topic and subscription.
  2. Subscribe and ack a message
  3. Immediately after acking (message.ack(): void), await subscription.close()
  4. Delete the subscription.
  5. Depending on timing, you will see something like Cannot ack messages: Subscription does not exist

I used this test to reproduce the issue, and also tested a potential fix (PR incoming):

import { getEnv } from '../../../env/env'
import * as pubsub from '@google-cloud/pubsub'
import { v4 } from 'uuid'
import { defer } from 'bluebird'

jest.setTimeout(1000000)

test('errors if messages are inflight while closing, then deleting subscription causing it to fail', async () => {
  const client = createPubSubSdk(getEnv())
  // with enough iterations we should be able to trigger it.
  // NOTE: when the fix is applied, this succeeds.
  for (let i = 0; i < 100; i++) {
    const topicName = `bug-topic-${v4()}`
    const subName = `bug-sub-${v4()}`
    const [topic] = await client.createTopic(topicName)
    const [sub] = await topic.createSubscription(subName, {
      expirationPolicy: { ttl: { seconds: 60 } },
    })

    await topic.publish(Buffer.from(JSON.stringify({ hello: 'world' })))

    const onHandlerEnter = defer()
    const onError = defer()
    sub.setOptions({ batching: { maxMilliseconds: 100 } })
    sub.addListener('message', (message: pubsub.Message) => {
      message.ack()
      onHandlerEnter.resolve()
    })
    sub.addListener('error', (err) => {
      console.log(err)
      onError.reject(err)
    })
    sub.open()

    await onHandlerEnter.promise

    // Kicks off the flush
    await new Promise((resolve) => setTimeout(resolve, 100))

    // Close while requests are pending. Wait for it.
    await sub.close()
    // Delete the subscription. This will trigger an error in ack.
    await sub.delete()
    await topic.delete()
    await Promise.race([
      new Promise((resolve) => setTimeout(resolve, 1000)),
      onError.promise,
    ])
  }
})

function createPubSubSdk(env: any) {
  return new pubsub.PubSub({
    projectId: env.GCLOUD_PROJECT,
    credentials: JSON.parse(env.GCLOUD_CREDENTIALS),
  })
}

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/nodejs-pubsub API.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions