Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #739

@flaky-bot

Description

@flaky-bot

Note: #538 was also for this test, but it was closed more than 10 days ago. So, I didn't mark it flaky.


commit: dc264e9
buildURL: Build Status, Sponge
status: failed

Test output
self = 
publisher = 
topic_path = 'projects/precise-truck-742/topics/t-1657830416650'
subscriber = 
subscription_path = 'projects/precise-truck-742/subscriptions/s-1657830416652'
cleanup = [(>, ()...erClient object at 0x7f3a35e98af0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1657830416652'})]
def test_streaming_pull_blocking_shutdown(
    self, publisher, topic_path, subscriber, subscription_path, cleanup
):
    # Make sure the topic and subscription get deleted.
    cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
    cleanup.append(
        (subscriber.delete_subscription, (), {"subscription": subscription_path})
    )

    # The ACK-s are only persisted if *all* messages published in the same batch
    # are ACK-ed. We thus publish each message in its own batch so that the backend
    # treats all messages' ACKs independently of each other.
    publisher.create_topic(name=topic_path)
    subscriber.create_subscription(name=subscription_path, topic=topic_path)
    _publish_messages(publisher, topic_path, batch_sizes=[1] * 10)

    # Artificially delay message processing, gracefully shutdown the streaming pull
    # in the meantime, then verify that those messages were nevertheless processed.
    processed_messages = []

    def callback(message):
        time.sleep(15)
        processed_messages.append(message.data)
        message.ack()

    # Flow control limits should exceed the number of worker threads, so that some
    # of the messages will be blocked on waiting for free scheduler threads.
    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
    subscription_future = subscriber.subscribe(
        subscription_path,
        callback=callback,
        flow_control=flow_control,
        scheduler=scheduler,
        await_callbacks_on_shutdown=True,
    )

    try:
        subscription_future.result(timeout=10)  # less than the sleep in callback
    except exceptions.TimeoutError:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # Blocking om shutdown should have waited for the already executing
    # callbacks to finish.
    assert len(processed_messages) == 3

    # The messages that were not processed should have been NACK-ed and we should
    # receive them again quite soon.
    all_done = threading.Barrier(7 + 1, timeout=5)  # +1 because of the main thread
    remaining = []

    def callback2(message):
        remaining.append(message.data)
        message.ack()
        all_done.wait()

    subscription_future = subscriber.subscribe(
        subscription_path, callback=callback2, await_callbacks_on_shutdown=False
    )

    try:
        all_done.wait()
    except threading.BrokenBarrierError:  # PRAGMA: no cover
        pytest.fail("The remaining messages have not been re-delivered in time.")
    finally:
        subscription_future.cancel()
        subscription_future.result()  # block until shutdown completes

    # There should be 7 messages left that were not yet processed and none of them
    # should be a message that should have already been sucessfully processed in the
    # first streaming pull.
    assert len(remaining) == 7
  assert not (set(processed_messages) & set(remaining))  # no re-delivery

E AssertionError: assert not ({b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'} & {b'message 1/1 of batch 1', b'message 1/1 of batch 10', b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 4', b'message 1/1 of batch 7', ...})
E + where {b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'} = set([b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'])
E + and {b'message 1/1 of batch 1', b'message 1/1 of batch 10', b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 4', b'message 1/1 of batch 7', ...} = set([b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 1', b'message 1/1 of batch 4', b'message 1/1 of batch 9', b'message 1/1 of batch 10', ...])

tests/system.py:690: AssertionError

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: pubsubIssues related to the googleapis/python-pubsub API.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.priority: p1Important issue which blocks shipping the next release. Will be fixed prior to next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions