Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #766

@flaky-bot

Description

@flaky-bot

Note: #739 was also for this test, but it was closed more than 10 days ago. So, I didn't mark it flaky.


commit: f5f39a2
buildURL: Build Status, Sponge
status: failed

Test output
self = 
publisher = 
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber = 
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]
def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
      all_done.wait()

tests/system.py:679:


self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5

def wait(self, timeout=None):
    """Wait for the barrier.

    When the specified number of threads have started waiting, they are all
    simultaneously awoken. If an 'action' was provided for the barrier, one
    of the threads will have executed that callback prior to returning.
    Returns an individual index number from 0 to 'parties-1'.

    """
    if timeout is None:
        timeout = self._timeout
    with self._cond:
        self._enter() # Block while the barrier drains.
        index = self._count
        self._count += 1
        try:
            if index + 1 == self._parties:
                # We release the barrier
                self._release()
            else:
                # We wait until someone releases us
              self._wait(timeout)

/usr/local/lib/python3.10/threading.py:661:


self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5

def _wait(self, timeout):
    if not self._cond.wait_for(lambda : self._state != 0, timeout):
        #timed out.  Break the barrier
        self._break()
      raise BrokenBarrierError

E threading.BrokenBarrierError

/usr/local/lib/python3.10/threading.py:699: BrokenBarrierError

During handling of the above exception, another exception occurred:

self = <tests.system.TestStreamingPull object at 0x7fe4c45f9a80>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fe4c2d3d960>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]

def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
        all_done.wait()
    except threading.BrokenBarrierError:  # PRAGMA: no cover
      pytest.fail("The remaining messages have not been re-delivered in time.")

tests/system.py:681:


reason = 'The remaining messages have not been re-delivered in time.'
pytrace = True, msg = None

@_with_exception(Failed)
def fail(
    reason: str = "", pytrace: bool = True, msg: Optional[str] = None
) -> "NoReturn":
    """Explicitly fail an executing test with the given message.

    :param reason:
        The message to show the user as reason for the failure.

    :param pytrace:
        If False, msg represents the full failure information and no
        python traceback will be reported.

    :param msg:
        Same as ``reason``, but deprecated. Will be removed in a future version, use ``reason`` instead.
    """
    __tracebackhide__ = True
    reason = _resolve_msg_to_reason("fail", reason, msg)
  raise Failed(msg=reason, pytrace=pytrace)

E Failed: The remaining messages have not been re-delivered in time.

.nox/system-3-10/lib/python3.10/site-packages/_pytest/outcomes.py:196: Failed

During handling of the above exception, another exception occurred:

self = <tests.system.TestStreamingPull object at 0x7fe4c45f9a80>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fe4c2d3d960>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]

def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
        all_done.wait()
    except threading.BrokenBarrierError:  # PRAGMA: no cover
        pytest.fail("The remaining messages have not been re-delivered in time.")
    finally:
        subscription_future.cancel()
      subscription_future.result()  # block until shutdown completes

tests/system.py:684:


/usr/local/lib/python3.10/concurrent/futures/_base.py:445: in result
return self.__get_result()
/usr/local/lib/python3.10/concurrent/futures/_base.py:390: in __get_result
raise self._exception
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: in _wrap_callback_errors
callback(message)
tests/system.py:672: in callback2
all_done.wait()
/usr/local/lib/python3.10/threading.py:661: in wait
self._wait(timeout)


self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5

def _wait(self, timeout):
    if not self._cond.wait_for(lambda : self._state != 0, timeout):
        #timed out.  Break the barrier
        self._break()
        raise BrokenBarrierError
    if self._state < 0:
      raise BrokenBarrierError

E threading.BrokenBarrierError

/usr/local/lib/python3.10/threading.py:701: BrokenBarrierError

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: pubsubIssues related to the googleapis/python-pubsub API.flakybot: flakyTells the Flaky Bot not to close or comment on this issue.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions