Skip to content

Commit ae20aa9

Browse files
thedrowOmer Katz
andauthored
Ensure task compression actually happens when setting task_compression (#7470)
* Ensure task compression actually happens when setting `task_compression`. Fixes #4838. Previously, we erroneously used `result_compression` as the configuration option for this behavior. It appears that compressing results was never supported in Celery or that the support for it was removed. This will be fixed later on. * Happify the linter. Co-authored-by: Omer Katz <omer.katz@kcg.tech>
1 parent 67c0dd0 commit ae20aa9

2 files changed

Lines changed: 26 additions & 5 deletions

File tree

celery/app/amqp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def _create_task_sender(self):
447447

448448
default_rkey = self.app.conf.task_default_routing_key
449449
default_serializer = self.app.conf.task_serializer
450-
default_compressor = self.app.conf.result_compression
450+
default_compressor = self.app.conf.task_compression
451451

452452
def send_task_message(producer, name, message,
453453
exchange=None, routing_key=None, queue=None,

t/unit/app/test_amqp.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,7 @@ def test_as_task_message_without_utc(self):
205205
self.app.amqp.as_task_v1(uuid(), 'foo', countdown=30, expires=40)
206206

207207

208-
class test_AMQP:
209-
208+
class test_AMQP_Base:
210209
def setup(self):
211210
self.simple_message = self.app.amqp.as_task_v2(
212211
uuid(), 'foo', create_sent_event=True,
@@ -215,6 +214,9 @@ def setup(self):
215214
uuid(), 'foo', create_sent_event=False,
216215
)
217216

217+
218+
class test_AMQP(test_AMQP_Base):
219+
218220
def test_kwargs_must_be_mapping(self):
219221
with pytest.raises(TypeError):
220222
self.app.amqp.as_task_v2(uuid(), 'foo', kwargs=[1, 2])
@@ -336,7 +338,7 @@ def update_conf_runtime_for_tasks_queues(self):
336338
assert router != router_was
337339

338340

339-
class test_as_task_v2:
341+
class test_as_task_v2(test_AMQP_Base):
340342

341343
def test_raises_if_args_is_not_tuple(self):
342344
with pytest.raises(TypeError):
@@ -368,8 +370,27 @@ def test_eta_to_datetime(self):
368370
)
369371
assert m.headers['eta'] == eta.isoformat()
370372

371-
def test_callbacks_errbacks_chord(self):
373+
def test_compression(self):
374+
self.app.conf.task_compression = 'gzip'
375+
376+
prod = Mock(name='producer')
377+
self.app.amqp.send_task_message(
378+
prod, 'foo', self.simple_message_no_sent_event,
379+
compression=None
380+
)
381+
assert prod.publish.call_args[1]['compression'] == 'gzip'
382+
383+
def test_compression_override(self):
384+
self.app.conf.task_compression = 'gzip'
385+
386+
prod = Mock(name='producer')
387+
self.app.amqp.send_task_message(
388+
prod, 'foo', self.simple_message_no_sent_event,
389+
compression='bz2'
390+
)
391+
assert prod.publish.call_args[1]['compression'] == 'bz2'
372392

393+
def test_callbacks_errbacks_chord(self):
373394
@self.app.task
374395
def t(i):
375396
pass

0 commit comments

Comments
 (0)