Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #538

@flaky-bot

Description

@flaky-bot

This test failed!

To configure my behavior, see the Flaky Bot documentation.

If I'm commenting on this issue too often, add the flakybot: quiet label and
I will stop commenting.


commit: c1b424e
buildURL: Build Status, Sponge
status: failed

Test output
self = 
publisher = 
topic_path = 'projects/precise-truck-742/topics/t-1637098106341'
subscriber = 
subscription_path = 'projects/precise-truck-742/subscriptions/s-1637098106343'
cleanup = [(>, ()...erClient object at 0x7f421f1a3be0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1637098106343'})]
def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
        all_done.wait()
    except threading.BrokenBarrierError:  # PRAGMA: no cover
        pytest.fail("The remaining messages have not been re-delivered in time.")
    finally:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # There should be 7 messages left that were not yet processed and none of them
    # should be a message that should have already been sucessfully processed in the
    # first streaming pull.
  assert len(remaining) == 7

E AssertionError: assert 8 == 7
E + where 8 = len([b'message 1/1 of batch 2', b'message 1/1 of batch 7', b'message 1/1 of batch 8', b'message 1/1 of batch 9', b'message 1/1 of batch 3', b'message 1/1 of batch 6', ...])

tests/system.py:689: AssertionError

Metadata

Metadata

Assignees

Labels

api: pubsubIssues related to the googleapis/python-pubsub API.flakybot: flakyTells the Flaky Bot not to close or comment on this issue.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.priority: p2Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions