-
Notifications
You must be signed in to change notification settings - Fork 216
tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #766
Description
Note: #739 was also for this test, but it was closed more than 10 days ago. So, I didn't mark it flaky.
commit: f5f39a2
buildURL: Build Status, Sponge
status: failed
Test output
self =
publisher =
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber =
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
(subscriber.delete_subscription, (), {"subscription": subscription_path})
)
# The ACK-s are only persisted if *all* messages published in the same batch
# are ACK-ed. We thus publish each message in its own batch so that the backend
# treats all messages' ACKs independently of each other.
publisher.create_topic(name=topic_path)
subscriber.create_subscription(name=subscription_path, topic=topic_path)
_publish_messages(publisher, topic_path, batch_sizes=[1] * 10)
# Artificially delay message processing, gracefully shutdown the streaming pull
# in the meantime, then verify that those messages were nevertheless processed.
processed_messages = []
def callback(message):
time.sleep(15)
processed_messages.append(message.data)
message.ack()
# Flow control limits should exceed the number of worker threads, so that some
# of the messages will be blocked on waiting for free scheduler threads.
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
subscription_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
scheduler=scheduler,
await_callbacks_on_shutdown=True,
)
try:
subscription_future.result(timeout=10) # less than the sleep in callback
except exceptions.TimeoutError:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# Blocking om shutdown should have waited for the already executing
# callbacks to finish.
assert len(processed_messages) == 3
# The messages that were not processed should have been NACK-ed and we should
# receive them again quite soon.
all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread
remaining = []
def callback2(message):
remaining.append(message.data)
message.ack()
all_done.wait()
subscription_future = subscriber.subscribe(
subscription_path, callback=callback2, await_callbacks_on_shutdown=False
)
try:
all_done.wait()
tests/system.py:679:
self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5
def wait(self, timeout=None):
"""Wait for the barrier.
When the specified number of threads have started waiting, they are all
simultaneously awoken. If an 'action' was provided for the barrier, one
of the threads will have executed that callback prior to returning.
Returns an individual index number from 0 to 'parties-1'.
"""
if timeout is None:
timeout = self._timeout
with self._cond:
self._enter() # Block while the barrier drains.
index = self._count
self._count += 1
try:
if index + 1 == self._parties:
# We release the barrier
self._release()
else:
# We wait until someone releases us
self._wait(timeout)
/usr/local/lib/python3.10/threading.py:661:
self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5
def _wait(self, timeout):
if not self._cond.wait_for(lambda : self._state != 0, timeout):
#timed out. Break the barrier
self._break()
raise BrokenBarrierError
E threading.BrokenBarrierError
/usr/local/lib/python3.10/threading.py:699: BrokenBarrierError
During handling of the above exception, another exception occurred:
self = <tests.system.TestStreamingPull object at 0x7fe4c45f9a80>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fe4c2d3d960>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
(subscriber.delete_subscription, (), {"subscription": subscription_path})
)
# The ACK-s are only persisted if *all* messages published in the same batch
# are ACK-ed. We thus publish each message in its own batch so that the backend
# treats all messages' ACKs independently of each other.
publisher.create_topic(name=topic_path)
subscriber.create_subscription(name=subscription_path, topic=topic_path)
_publish_messages(publisher, topic_path, batch_sizes=[1] * 10)
# Artificially delay message processing, gracefully shutdown the streaming pull
# in the meantime, then verify that those messages were nevertheless processed.
processed_messages = []
def callback(message):
time.sleep(15)
processed_messages.append(message.data)
message.ack()
# Flow control limits should exceed the number of worker threads, so that some
# of the messages will be blocked on waiting for free scheduler threads.
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
subscription_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
scheduler=scheduler,
await_callbacks_on_shutdown=True,
)
try:
subscription_future.result(timeout=10) # less than the sleep in callback
except exceptions.TimeoutError:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# Blocking om shutdown should have waited for the already executing
# callbacks to finish.
assert len(processed_messages) == 3
# The messages that were not processed should have been NACK-ed and we should
# receive them again quite soon.
all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread
remaining = []
def callback2(message):
remaining.append(message.data)
message.ack()
all_done.wait()
subscription_future = subscriber.subscribe(
subscription_path, callback=callback2, await_callbacks_on_shutdown=False
)
try:
all_done.wait()
except threading.BrokenBarrierError: # PRAGMA: no cover
pytest.fail("The remaining messages have not been re-delivered in time.")
tests/system.py:681:
reason = 'The remaining messages have not been re-delivered in time.'
pytrace = True, msg = None
@_with_exception(Failed)
def fail(
reason: str = "", pytrace: bool = True, msg: Optional[str] = None
) -> "NoReturn":
"""Explicitly fail an executing test with the given message.
:param reason:
The message to show the user as reason for the failure.
:param pytrace:
If False, msg represents the full failure information and no
python traceback will be reported.
:param msg:
Same as ``reason``, but deprecated. Will be removed in a future version, use ``reason`` instead.
"""
__tracebackhide__ = True
reason = _resolve_msg_to_reason("fail", reason, msg)
raise Failed(msg=reason, pytrace=pytrace)
E Failed: The remaining messages have not been re-delivered in time.
.nox/system-3-10/lib/python3.10/site-packages/_pytest/outcomes.py:196: Failed
During handling of the above exception, another exception occurred:
self = <tests.system.TestStreamingPull object at 0x7fe4c45f9a80>
publisher = <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>
topic_path = 'projects/precise-truck-742/topics/t-1660940810948'
subscriber = <google.cloud.pubsub_v1.SubscriberClient object at 0x7fe4c2d3d960>
subscription_path = 'projects/precise-truck-742/subscriptions/s-1660940810950'
cleanup = [(<bound method PublisherClient.delete_topic of <google.cloud.pubsub_v1.PublisherClient object at 0x7fe4c2c9bb80>>, ()...erClient object at 0x7fe4c2d3d960>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1660940810950'})]
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
(subscriber.delete_subscription, (), {"subscription": subscription_path})
)
# The ACK-s are only persisted if *all* messages published in the same batch
# are ACK-ed. We thus publish each message in its own batch so that the backend
# treats all messages' ACKs independently of each other.
publisher.create_topic(name=topic_path)
subscriber.create_subscription(name=subscription_path, topic=topic_path)
_publish_messages(publisher, topic_path, batch_sizes=[1] * 10)
# Artificially delay message processing, gracefully shutdown the streaming pull
# in the meantime, then verify that those messages were nevertheless processed.
processed_messages = []
def callback(message):
time.sleep(15)
processed_messages.append(message.data)
message.ack()
# Flow control limits should exceed the number of worker threads, so that some
# of the messages will be blocked on waiting for free scheduler threads.
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
subscription_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
scheduler=scheduler,
await_callbacks_on_shutdown=True,
)
try:
subscription_future.result(timeout=10) # less than the sleep in callback
except exceptions.TimeoutError:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# Blocking om shutdown should have waited for the already executing
# callbacks to finish.
assert len(processed_messages) == 3
# The messages that were not processed should have been NACK-ed and we should
# receive them again quite soon.
all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread
remaining = []
def callback2(message):
remaining.append(message.data)
message.ack()
all_done.wait()
subscription_future = subscriber.subscribe(
subscription_path, callback=callback2, await_callbacks_on_shutdown=False
)
try:
all_done.wait()
except threading.BrokenBarrierError: # PRAGMA: no cover
pytest.fail("The remaining messages have not been re-delivered in time.")
finally:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
tests/system.py:684:
/usr/local/lib/python3.10/concurrent/futures/_base.py:445: in result
return self.__get_result()
/usr/local/lib/python3.10/concurrent/futures/_base.py:390: in __get_result
raise self._exception
google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: in _wrap_callback_errors
callback(message)
tests/system.py:672: in callback2
all_done.wait()
/usr/local/lib/python3.10/threading.py:661: in wait
self._wait(timeout)
self = <threading.Barrier object at 0x7fe4c2d06b30>, timeout = 5
def _wait(self, timeout):
if not self._cond.wait_for(lambda : self._state != 0, timeout):
#timed out. Break the barrier
self._break()
raise BrokenBarrierError
if self._state < 0:
raise BrokenBarrierError
E threading.BrokenBarrierError
/usr/local/lib/python3.10/threading.py:701: BrokenBarrierError