-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Description
When Request.on_timeout receive a soft timeout from billiard, it does the same as if it was receiving a hard time limit exception. This is ran by the controller.
But the task may catch this exception and eg. return (this is what soft timeout are for).
This cause:
- the result to be saved once as an exception by the controller (on_timeout) and another time with the result returned by the task
- the task status to be passed to failure and to success on the same manner
- if the task is participating to a chord, the chord result counter (at least with redis) is incremented twice (instead of once), making the chord to return prematurely and eventually loose tasks…
1, 2 and 3 can leads of course to strange race conditions…
Steps to reproduce (Illustration)
with the program in test_timeout.py:
import time
import celery
app = celery.Celery('test_timeout')
app.conf.update(
result_backend="redis://localhost/0",
broker_url="amqp://celery:celery@localhost:5672/host",
)
@app.task(soft_time_limit=1)
def test():
try:
time.sleep(2)
except Exception:
return 1
@app.task()
def add(args):
print("### adding", args)
return sum(args)
@app.task()
def on_error(context, exception, traceback, **kwargs):
print("### on_error: ", exception)
if __name__ == "__main__":
result = celery.chord([test.s().set(link_error=on_error.s()), test.s().set(link_error=on_error.s())])(add.s())
result.get()start a worker and the program:
$ celery -A test_timeout worker -l WARNING
$ python3 test_timeout.py
Expected behavior
add method is called with [1, 1] as argument and test_timeout.py return normally
Actual behavior
The test_timeout.py fails, with
celery.backends.base.ChordError: Callback error: ChordError("Dependency 15109e05-da43-449f-9081-85d839ac0ef2 raised SoftTimeLimitExceeded('SoftTimeLimitExceeded(True,)',)",
On the worker side, the on_error is called but the add method as well !
[2017-11-29 23:07:25,538: WARNING/MainProcess] Soft time limit (1s) exceeded for test_timeout.test[15109e05-da43-449f-9081-85d839ac0ef2]
[2017-11-29 23:07:25,546: WARNING/MainProcess] ### on_error:
[2017-11-29 23:07:25,546: WARNING/MainProcess] SoftTimeLimitExceeded(True,)
[2017-11-29 23:07:25,547: WARNING/MainProcess] Soft time limit (1s) exceeded for test_timeout.test[38f3f7f2-4a89-4318-8ee9-36a987f73757]
[2017-11-29 23:07:25,553: ERROR/MainProcess] Chord callback for 'ef6d7a38-d1b4-40ad-b937-ffa84e40bb23' raised: ChordError("Dependency 15109e05-da43-449f-9081-85d839ac0ef2 raised SoftTimeLimitExceeded('SoftTimeLimitExceeded(True,)',)",)
Traceback (most recent call last):
File "/usr/local/lib/python3.4/dist-packages/celery/backends/redis.py", line 290, in on_chord_part_return
callback.delay([unpack(tup, decode) for tup in resl])
File "/usr/local/lib/python3.4/dist-packages/celery/backends/redis.py", line 290, in <listcomp>
callback.delay([unpack(tup, decode) for tup in resl])
File "/usr/local/lib/python3.4/dist-packages/celery/backends/redis.py", line 243, in _unpack_chord_result
raise ChordError('Dependency {0} raised {1!r}'.format(tid, retval))
celery.exceptions.ChordError: Dependency 15109e05-da43-449f-9081-85d839ac0ef2 raised SoftTimeLimitExceeded('SoftTimeLimitExceeded(True,)',)
[2017-11-29 23:07:25,565: WARNING/MainProcess] ### on_error:
[2017-11-29 23:07:25,565: WARNING/MainProcess] SoftTimeLimitExceeded(True,)
[2017-11-29 23:07:27,262: WARNING/PoolWorker-2] ### adding
[2017-11-29 23:07:27,264: WARNING/PoolWorker-2] [1, 1]
Of course, on purpose did I choose to call the test.s() twice, to show that the count in the chord continues. In fact:
- the chord result is incremented twice by the error of soft time limit
- the chord result is again incremented twice by the correct returning of
testtask
Conclusion
Request.on_timeout should not process soft time limit exception.
here is a quick monkey patch (correction of celery is trivial)
def patch_celery_request_on_timeout():
from celery.worker import request
orig = request.Request.on_timeout
def patched_on_timeout(self, soft, timeout):
if not soft:
orig(self, soft, timeout)
request.Request.on_timeout = patched_on_timeout
patch_celery_request_on_timeout()version info
software -> celery:4.1.0 (latentcall) kombu:4.0.2 py:3.4.3
billiard:3.5.0.2 py-amqp:2.1.4
platform -> system:Linux arch:64bit, ELF imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:amqp results:redis://10.0.3.253/0