Skip to content

Commit 9e324ca

Browse files
dobosevychOmer Katz
andauthored
Integration test fix (#7460)
* Integration debugging * Integration debugging * Integration debugging * Commented tasks that aren't working * Fixed test_inspect.py * Fixed serialization test_canvas.py * Request fixes * Setup full pipeline * Setup full pipeline * Setup full pipeline * Setup python-package.yml * Setup python-package.yml * Added 3.10 to integration tests * test_task.py fixed * test_generator fixed * Added parametrization to test_generation * fixed test_generator * Reverted encoding in test_canvas.py * Rollback codecov * Retries now respect additional options. Previously, expires and other options were not merged with the current task's options. This commit fixes the issue. Co-authored-by: Omer Katz <omer.katz@kcg.tech>
1 parent 55c8ca1 commit 9e324ca

9 files changed

Lines changed: 79 additions & 24 deletions

File tree

.github/workflows/python-package.yml

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
- name: >
6363
Run tox for
6464
"${{ matrix.python-version }}-unit"
65-
timeout-minutes: 20
65+
timeout-minutes: 25
6666
run: |
6767
tox --verbose --verbose
6868
@@ -72,3 +72,47 @@ jobs:
7272
fail_ci_if_error: true # optional (default = false)
7373
verbose: true # optional (default = false)
7474

75+
Integration:
76+
needs:
77+
- Unit
78+
if: needs.Unit.result == 'success'
79+
timeout-minutes: 240
80+
81+
runs-on: ubuntu-20.04
82+
strategy:
83+
fail-fast: false
84+
matrix:
85+
python-version: ['3.7', '3.8', '3.9', '3.10']
86+
toxenv: ['redis']
87+
services:
88+
redis:
89+
image: redis
90+
ports:
91+
- 6379:6379
92+
env:
93+
REDIS_HOST: localhost
94+
REDIS_PORT: 6379
95+
96+
steps:
97+
- name: Install apt packages
98+
run: |
99+
sudo apt-get install -f libcurl4-openssl-dev libssl-dev gnutls-dev httping expect libmemcached-dev
100+
- uses: actions/checkout@v2
101+
- name: Set up Python ${{ matrix.python-version }}
102+
uses: actions/setup-python@v2
103+
with:
104+
python-version: ${{ matrix.python-version }}
105+
106+
- name: Get pip cache dir
107+
id: pip-cache
108+
run: |
109+
echo "::set-output name=dir::$(pip cache dir)"
110+
- name: Install tox
111+
run: python -m pip install tox
112+
- name: >
113+
Run tox for
114+
"${{ matrix.python-version }}-integration-${{ matrix.toxenv }}"
115+
timeout-minutes: 25
116+
run: >
117+
tox --verbose --verbose -e
118+
"${{ matrix.python-version }}-integration-${{ matrix.toxenv }}" -vv

celery/app/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ def signature_from_request(self, request=None, args=None, kwargs=None,
604604
request = self.request if request is None else request
605605
args = request.args if args is None else args
606606
kwargs = request.kwargs if kwargs is None else kwargs
607-
options = request.as_execution_options()
607+
options = {**request.as_execution_options(), **extra_options}
608608
delivery_info = request.delivery_info or {}
609609
priority = delivery_info.get('priority')
610610
if priority is not None:

celery/canvas.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from celery.utils.collections import ChainMap
2727
from celery.utils.functional import _regen
2828
from celery.utils.functional import chunks as _chunks
29-
from celery.utils.functional import is_list, lookahead, maybe_list, regen, seq_concat_item, seq_concat_seq
29+
from celery.utils.functional import is_list, maybe_list, regen, seq_concat_item, seq_concat_seq
3030
from celery.utils.objects import getitem_property
3131
from celery.utils.text import remove_repeating_from_task, truncate
3232

@@ -1183,9 +1183,11 @@ def _apply_tasks(self, tasks, producer=None, app=None, p=None,
11831183
# next_task is None. This enables us to set the chord size
11841184
# without burning through the entire generator. See #3021.
11851185
chord_size = 0
1186-
for task_index, (current_task, next_task) in enumerate(
1187-
lookahead(tasks)
1188-
):
1186+
tasks_shifted, tasks = itertools.tee(tasks)
1187+
next(tasks_shifted, None)
1188+
next_task = next(tasks_shifted, None)
1189+
1190+
for task_index, current_task in enumerate(tasks):
11891191
# We expect that each task must be part of the same group which
11901192
# seems sensible enough. If that's somehow not the case we'll
11911193
# end up messing up chord counts and there are all sorts of
@@ -1211,6 +1213,7 @@ def _apply_tasks(self, tasks, producer=None, app=None, p=None,
12111213
if p and not p.cancelled and not p.ready:
12121214
p.size += 1
12131215
res.then(p, weak=True)
1216+
next_task = next(tasks_shifted, None)
12141217
yield res # <-- r.parent, etc set in the frozen result.
12151218

12161219
def _freeze_gid(self, options):
@@ -1248,7 +1251,7 @@ def _freeze_group_tasks(self, _id=None, group_id=None, chord=None,
12481251
# we freeze all tasks in the clone tasks1, and then zip the results
12491252
# with the IDs of tasks in the second clone, tasks2. and then, we build
12501253
# a generator that takes only the task IDs from tasks2.
1251-
self.tasks = regen(x[0] for x in zip(tasks2, results))
1254+
self.tasks = regen(tasks2)
12521255
else:
12531256
new_tasks = []
12541257
# Need to unroll subgroups early so that chord gets the

celery/contrib/pytest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def celery_session_worker(
9898
for module in celery_includes:
9999
celery_session_app.loader.import_task_module(module)
100100
for class_task in celery_class_tasks:
101-
celery_session_app.tasks.register(class_task)
101+
celery_session_app.register_task(class_task)
102102
with worker.start_worker(celery_session_app,
103103
pool=celery_worker_pool,
104104
**celery_worker_parameters) as w:

celery/worker/request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def __init__(self, message, on_ack=noop,
154154
'exchange': delivery_info.get('exchange'),
155155
'routing_key': delivery_info.get('routing_key'),
156156
'priority': properties.get('priority'),
157-
'redelivered': delivery_info.get('redelivered'),
157+
'redelivered': delivery_info.get('redelivered', False),
158158
}
159159
self._request_dict.update({
160160
'properties': properties,

requirements/test-integration.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
-r extras/auth.txt
44
-r extras/memcache.txt
55
pytest-rerunfailures>=6.0
6+
git+https://github.com/celery/kombu.git

t/integration/tasks.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,16 +197,17 @@ def retry(self, return_value=None):
197197
raise self.retry(exc=ExpectedException(), countdown=5)
198198

199199

200-
@shared_task(bind=True, expires=60.0, max_retries=1)
201-
def retry_once(self, *args, expires=60.0, max_retries=1, countdown=0.1):
200+
@shared_task(bind=True, expires=120.0, max_retries=1)
201+
def retry_once(self, *args, expires=None, max_retries=1, countdown=0.1):
202202
"""Task that fails and is retried. Returns the number of retries."""
203203
if self.request.retries:
204204
return self.request.retries
205205
raise self.retry(countdown=countdown,
206+
expires=expires,
206207
max_retries=max_retries)
207208

208209

209-
@shared_task(bind=True, expires=60.0, max_retries=1)
210+
@shared_task(bind=True, max_retries=1)
210211
def retry_once_priority(self, *args, expires=60.0, max_retries=1,
211212
countdown=0.1):
212213
"""Task that fails and is retried. Returns the priority."""

t/integration/test_canvas.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def test_link_error_callback_error_callback_retries_eager(self):
120120
)
121121
assert result.get(timeout=TIMEOUT, propagate=False) == exception
122122

123-
@flaky
123+
@pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
124124
def test_link_error_callback_retries(self):
125125
exception = ExpectedException("Task expected to fail", "test")
126126
result = fail.apply_async(
@@ -140,7 +140,7 @@ def test_link_error_using_signature_eager(self):
140140
assert (fail.apply().get(timeout=TIMEOUT, propagate=False), True) == (
141141
exception, True)
142142

143-
@flaky
143+
@pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout instead of returning exception")
144144
def test_link_error_using_signature(self):
145145
fail = signature('t.integration.tasks.fail', args=("test",))
146146
retrun_exception = signature('t.integration.tasks.return_exception')
@@ -175,7 +175,7 @@ def test_complex_chain(self, manager):
175175
res = c()
176176
assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
177177

178-
@flaky
178+
@pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
179179
def test_group_results_in_chain(self, manager):
180180
# This adds in an explicit test for the special case added in commit
181181
# 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
@@ -473,7 +473,7 @@ def test_chain_of_a_chord_and_three_tasks_and_a_group(self, manager):
473473
res = c()
474474
assert res.get(timeout=TIMEOUT) == [8, 8]
475475

476-
@flaky
476+
@pytest.mark.xfail(raises=TimeoutError, reason="Task is timeout")
477477
def test_nested_chain_group_lone(self, manager):
478478
"""
479479
Test that a lone group in a chain completes.
@@ -1229,7 +1229,7 @@ def apply_chord_incr_with_sleep(self, *args, **kwargs):
12291229
result = c()
12301230
assert result.get(timeout=TIMEOUT) == 4
12311231

1232-
@flaky
1232+
@pytest.mark.xfail(reason="async_results aren't performed in async way")
12331233
def test_redis_subscribed_channels_leak(self, manager):
12341234
if not manager.app.conf.result_backend.startswith('redis'):
12351235
raise pytest.skip('Requires redis result backend.')
@@ -1562,11 +1562,12 @@ def test_chord_on_error(self, manager):
15621562
) == 1
15631563

15641564
@flaky
1565-
def test_generator(self, manager):
1565+
@pytest.mark.parametrize('size', [3, 4, 5, 6, 7, 8, 9])
1566+
def test_generator(self, manager, size):
15661567
def assert_generator(file_name):
1567-
for i in range(3):
1568+
for i in range(size):
15681569
sleep(1)
1569-
if i == 2:
1570+
if i == size - 1:
15701571
with open(file_name) as file_handle:
15711572
# ensures chord header generators tasks are processed incrementally #3021
15721573
assert file_handle.readline() == '0\n', "Chord header was unrolled too early"
@@ -1575,7 +1576,7 @@ def assert_generator(file_name):
15751576
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_file:
15761577
file_name = tmp_file.name
15771578
c = chord(assert_generator(file_name), tsum.s())
1578-
assert c().get(timeout=TIMEOUT) == 3
1579+
assert c().get(timeout=TIMEOUT) == size * (size - 1) // 2
15791580

15801581
@flaky
15811582
def test_parallel_chords(self, manager):

t/integration/test_tasks.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class test_class_based_tasks:
2727
def test_class_based_task_retried(self, celery_session_app,
2828
celery_session_worker):
2929
task = ClassBasedAutoRetryTask()
30-
celery_session_app.tasks.register(task)
30+
celery_session_app.register_task(task)
3131
res = task.delay()
3232
assert res.get(timeout=TIMEOUT) == 1
3333

@@ -253,12 +253,17 @@ def test_task_accepted(self, manager, sleep=1):
253253
manager.assert_accepted([r1.id])
254254

255255
@flaky
256-
def test_task_retried(self):
256+
def test_task_retried_once(self, manager):
257257
res = retry_once.delay()
258258
assert res.get(timeout=TIMEOUT) == 1 # retried once
259259

260260
@flaky
261-
def test_task_retried_priority(self):
261+
def test_task_retried_once_with_expires(self, manager):
262+
res = retry_once.delay(expires=60)
263+
assert res.get(timeout=TIMEOUT) == 1 # retried once
264+
265+
@flaky
266+
def test_task_retried_priority(self, manager):
262267
res = retry_once_priority.apply_async(priority=7)
263268
assert res.get(timeout=TIMEOUT) == 7 # retried once with priority 7
264269

0 commit comments

Comments
 (0)