Skip to content

Commit 3d1bb30

Browse files
committed
Add support for auto-acknowledging pulled messages.
Follows @jgeewax's suggested implementation in: #798 (comment)
1 parent 98edc64 commit 3d1bb30

File tree

3 files changed

+105
-0
lines changed

3 files changed

+105
-0
lines changed

docs/pubsub-usage.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,3 +272,29 @@ Fetch messages for a pull subscription without blocking (none pending):
272272
>>> messages = [recv[1] for recv in received]
273273
>>> [message.message_id for message in messages]
274274
[]
275+
276+
277+
Fetch pending messages, acknowledging those whose processing doesn't raise an
278+
error:
279+
280+
.. doctest::
281+
282+
>>> from gcloud import pubsub
283+
>>> client = pubsub.Client()
284+
>>> topic = client.topic('topic_name')
285+
>>> subscription = topic.subscription('subscription_name')
286+
>>> with topic.batch() as batch:
287+
... batch.publish('this is the first message_payload')
288+
... batch.publish('this is the second message_payload',
289+
... attr1='value1', attr2='value2')
290+
>>> from gcloud.pubsub.subscription import AutoAck
291+
>>> for ack_id, message in subscription.pull(max_messages=10): # API request
292+
... with AutoAck(subscription, ack_id, message):
293+
... do_something_with(message)
294+
295+
.. note::
296+
297+
One ``acknowledge`` API request occurs at the end of each ``with`` block,
298+
passing only the ``ack_id`` of the message just processed. If
299+
``do_something_with`` raises an exception, the ``acknowledge`` API
300+
request is skipped.

gcloud/pubsub/subscription.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,3 +263,29 @@ def delete(self, client=None):
263263
"""
264264
client = self._require_client(client)
265265
client.connection.api_request(method='DELETE', path=self.path)
266+
267+
268+
class AutoAck(object):
269+
"""Automatically acknowlege a single message if processed without error.
270+
271+
:type subscription: :class:`Subscription`
272+
:param subscription: the subscription from which the message was pulled,
273+
and to which it must be acknowledged.
274+
275+
:type ack_id: string
276+
:param ack_id: the ID for acknowledging the message
277+
278+
:type message: :class:`gcloud.pubsub.message.Message`
279+
:param message: the message to be acknowleged
280+
"""
281+
def __init__(self, subscription, ack_id, message):
282+
self.subscription = subscription
283+
self.ack_id = ack_id
284+
self.message = message
285+
286+
def __enter__(self):
287+
return self
288+
289+
def __exit__(self, exc_type, exc_val, exc_tb):
290+
if exc_type is None:
291+
self.subscription.acknowledge([self.ack_id])

gcloud/pubsub/test_subscription.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,48 @@ def test_delete_w_alternate_client(self):
485485
self.assertEqual(req['path'], '/%s' % SUB_PATH)
486486

487487

488+
class TestAutoAck(unittest2.TestCase):
489+
490+
def _getTargetClass(self):
491+
from gcloud.pubsub.subscription import AutoAck
492+
return AutoAck
493+
494+
def _makeOne(self, *args, **kw):
495+
return self._getTargetClass()(*args, **kw)
496+
497+
def test_ctor(self):
498+
ACK_ID = 'ACK_ID'
499+
MESSAGE = object()
500+
subscription = _FauxSubscription()
501+
auto_ack = self._makeOne(subscription, ACK_ID, MESSAGE)
502+
self.assertTrue(auto_ack.subscription is subscription)
503+
self.assertEqual(auto_ack.ack_id, ACK_ID)
504+
self.assertTrue(auto_ack.message is MESSAGE)
505+
506+
def test_as_context_mgr_no_error(self):
507+
ACK_ID = 'ACK_ID'
508+
MESSAGE = object()
509+
subscription = _FauxSubscription()
510+
511+
with self._makeOne(subscription, ACK_ID, MESSAGE):
512+
pass
513+
514+
self.assertEqual(list(subscription._acknowledged), [ACK_ID])
515+
self.assertEqual(subscription._ack_client, None)
516+
517+
def test_as_context_mgr_w_error(self):
518+
ACK_ID = 'ACK_ID'
519+
MESSAGE = object()
520+
subscription = _FauxSubscription()
521+
522+
with self.assertRaises(ValueError):
523+
with self._makeOne(subscription, ACK_ID, MESSAGE):
524+
raise ValueError()
525+
526+
self.assertEqual(list(subscription._acknowledged), [])
527+
self.assertTrue(getattr(subscription, '_ack_client', self) is self)
528+
529+
488530
class _Connection(object):
489531

490532
def __init__(self, *responses):
@@ -522,3 +564,14 @@ def __init__(self, project, connection=None):
522564
def topic(self, name, timestamp_messages=False):
523565
from gcloud.pubsub.topic import Topic
524566
return Topic(name, client=self, timestamp_messages=timestamp_messages)
567+
568+
569+
class _FauxSubscription(object):
570+
571+
def __init__(self):
572+
self._acknowledged = set()
573+
574+
def acknowledge(self, ack_ids, client=None):
575+
self._ack_client = client
576+
for ack_id in ack_ids:
577+
self._acknowledged.add(ack_id)

0 commit comments

Comments
 (0)