This repository was archived by the owner on Mar 9, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 215
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #538
Copy link
Copy link
Closed
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.flakybot: flakyTells the Flaky Bot not to close or comment on this issue.Tells the Flaky Bot not to close or comment on this issue.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.An issue filed by the Flaky Bot. Should not be added manually.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Description
This test failed!
To configure my behavior, see the Flaky Bot documentation.
If I'm commenting on this issue too often, add the flakybot: quiet label and
I will stop commenting.
commit: c1b424e
buildURL: Build Status, Sponge
status: failed
Test output
self =
publisher =
topic_path = 'projects/precise-truck-742/topics/t-1637098106341'
subscriber =
subscription_path = 'projects/precise-truck-742/subscriptions/s-1637098106343'
cleanup = [(>, ()...erClient object at 0x7f421f1a3be0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1637098106343'})]
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
(subscriber.delete_subscription, (), {"subscription": subscription_path})
)
# The ACK-s are only persisted if *all* messages published in the same batch
# are ACK-ed. We thus publish each message in its own batch so that the backend
# treats all messages' ACKs independently of each other.
publisher.create_topic(name=topic_path)
subscriber.create_subscription(name=subscription_path, topic=topic_path)
_publish_messages(publisher, topic_path, batch_sizes=[1] * 10)
# Artificially delay message processing, gracefully shutdown the streaming pull
# in the meantime, then verify that those messages were nevertheless processed.
processed_messages = []
def callback(message):
time.sleep(15)
processed_messages.append(message.data)
message.ack()
# Flow control limits should exceed the number of worker threads, so that some
# of the messages will be blocked on waiting for free scheduler threads.
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
subscription_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
scheduler=scheduler,
await_callbacks_on_shutdown=True,
)
try:
subscription_future.result(timeout=10) # less than the sleep in callback
except exceptions.TimeoutError:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# Blocking om shutdown should have waited for the already executing
# callbacks to finish.
assert len(processed_messages) == 3
# The messages that were not processed should have been NACK-ed and we should
# receive them again quite soon.
all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread
remaining = []
def callback2(message):
remaining.append(message.data)
message.ack()
all_done.wait()
subscription_future = subscriber.subscribe(
subscription_path, callback=callback2, await_callbacks_on_shutdown=False
)
try:
all_done.wait()
except threading.BrokenBarrierError: # PRAGMA: no cover
pytest.fail("The remaining messages have not been re-delivered in time.")
finally:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# There should be 7 messages left that were not yet processed and none of them
# should be a message that should have already been sucessfully processed in the
# first streaming pull.
assert len(remaining) == 7
E AssertionError: assert 8 == 7
E + where 8 = len([b'message 1/1 of batch 2', b'message 1/1 of batch 7', b'message 1/1 of batch 8', b'message 1/1 of batch 9', b'message 1/1 of batch 3', b'message 1/1 of batch 6', ...])
tests/system.py:689: AssertionError
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: pubsubIssues related to the googleapis/python-pubsub API.Issues related to the googleapis/python-pubsub API.flakybot: flakyTells the Flaky Bot not to close or comment on this issue.Tells the Flaky Bot not to close or comment on this issue.flakybot: issueAn issue filed by the Flaky Bot. Should not be added manually.An issue filed by the Flaky Bot. Should not be added manually.priority: p2Moderately-important priority. Fix may not be included in next release.Moderately-important priority. Fix may not be included in next release.type: bugError or flaw in code with unintended results or allowing sub-optimal usage patterns.Error or flaw in code with unintended results or allowing sub-optimal usage patterns.