-
Notifications
You must be signed in to change notification settings - Fork 216
tests.system.TestStreamingPull: test_streaming_pull_blocking_shutdown failed #739
Description
Note: #538 was also for this test, but it was closed more than 10 days ago. So, I didn't mark it flaky.
commit: dc264e9
buildURL: Build Status, Sponge
status: failed
Test output
self =
publisher =
topic_path = 'projects/precise-truck-742/topics/t-1657830416650'
subscriber =
subscription_path = 'projects/precise-truck-742/subscriptions/s-1657830416652'
cleanup = [(>, ()...erClient object at 0x7f3a35e98af0>>, (), {'subscription': 'projects/precise-truck-742/subscriptions/s-1657830416652'})]
def test_streaming_pull_blocking_shutdown(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, (), {"topic": topic_path}))
cleanup.append(
(subscriber.delete_subscription, (), {"subscription": subscription_path})
)
# The ACK-s are only persisted if *all* messages published in the same batch
# are ACK-ed. We thus publish each message in its own batch so that the backend
# treats all messages' ACKs independently of each other.
publisher.create_topic(name=topic_path)
subscriber.create_subscription(name=subscription_path, topic=topic_path)
_publish_messages(publisher, topic_path, batch_sizes=[1] * 10)
# Artificially delay message processing, gracefully shutdown the streaming pull
# in the meantime, then verify that those messages were nevertheless processed.
processed_messages = []
def callback(message):
time.sleep(15)
processed_messages.append(message.data)
message.ack()
# Flow control limits should exceed the number of worker threads, so that some
# of the messages will be blocked on waiting for free scheduler threads.
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
subscription_future = subscriber.subscribe(
subscription_path,
callback=callback,
flow_control=flow_control,
scheduler=scheduler,
await_callbacks_on_shutdown=True,
)
try:
subscription_future.result(timeout=10) # less than the sleep in callback
except exceptions.TimeoutError:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# Blocking om shutdown should have waited for the already executing
# callbacks to finish.
assert len(processed_messages) == 3
# The messages that were not processed should have been NACK-ed and we should
# receive them again quite soon.
all_done = threading.Barrier(7 + 1, timeout=5) # +1 because of the main thread
remaining = []
def callback2(message):
remaining.append(message.data)
message.ack()
all_done.wait()
subscription_future = subscriber.subscribe(
subscription_path, callback=callback2, await_callbacks_on_shutdown=False
)
try:
all_done.wait()
except threading.BrokenBarrierError: # PRAGMA: no cover
pytest.fail("The remaining messages have not been re-delivered in time.")
finally:
subscription_future.cancel()
subscription_future.result() # block until shutdown completes
# There should be 7 messages left that were not yet processed and none of them
# should be a message that should have already been sucessfully processed in the
# first streaming pull.
assert len(remaining) == 7
assert not (set(processed_messages) & set(remaining)) # no re-delivery
E AssertionError: assert not ({b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'} & {b'message 1/1 of batch 1', b'message 1/1 of batch 10', b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 4', b'message 1/1 of batch 7', ...})
E + where {b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'} = set([b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 5'])
E + and {b'message 1/1 of batch 1', b'message 1/1 of batch 10', b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 4', b'message 1/1 of batch 7', ...} = set([b'message 1/1 of batch 2', b'message 1/1 of batch 3', b'message 1/1 of batch 1', b'message 1/1 of batch 4', b'message 1/1 of batch 9', b'message 1/1 of batch 10', ...])
tests/system.py:690: AssertionError