-
-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Checklist
- I have verified that the issue exists against the
masterbranch of Celery. - This has already been asked to the discussion group first.
- I have read the relevant section in the
contribution guide
on reporting bugs. - I have checked the issues list
for similar or identical bug reports. - I have checked the pull requests list
for existing proposed fixes. - I have checked the commit log
to find out if the bug was already fixed in the master branch. - I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway).
Mandatory Debugging Information
- I have included the output of
celery -A proj reportin the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
masterbranch of Celery. - I have included the contents of
pip freezein the issue. N/A - I have included all the versions of all the external dependencies required
to reproduce this bug. N/A
Optional Debugging Information
- I have tried reproducing the issue on more than one Python version
and/or implementation. - I have tried reproducing the issue on more than one message broker and/or
result backend. - I have tried reproducing the issue on more than one version of the message
broker and/or result backend. - I have tried reproducing the issue on more than one operating system.
- I have tried reproducing the issue on more than one workers pool.
- I have tried reproducing the issue with autoscaling, retries,
ETA/Countdown & rate limits disabled. - I have tried reproducing the issue after downgrading
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
Possible Duplicates
- None
Environment & Settings
Celery version: 4.4.7 (cliffs)
celery report Output:
software -> celery:4.4.7 (cliffs) kombu:4.6.11 py:3.8.2
billiard:3.6.3.0 py-amqp:2.6.1
platform -> system:Darwin arch:64bit
kernel version:16.7.0 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:amqp results:db+postgresql://<redacted>
include: ['redacted']
accept_content: ['redacted-custom']
database_table_names: {
'group': 'celery_group', 'task': 'celery_task'}
result_serializer: 'redacted-custom'
task_serializer: 'redacted-custom'
task_track_started: True
broker_url: 'amqp://<redacted>'
result_backend: 'db+postgresql://<redacted>'
Steps to Reproduce
When celery uses a database result backend, the following line can be called multiple times from different processes:
| ResultModelBase.metadata.create_all(engine) |
This is a race condition because SQLAlchemy first checks if the tables/sequences exist and then tries to create them. It causes errors like this (at least on PostgreSQL):
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/redacted.py", line 168, in _redacted
result = async_result.get()
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 226, in get
self.maybe_throw(callback=callback)
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 342, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/usr/local/lib/python3.7/site-packages/celery/result.py", line 335, in throw
self.on_ready.throw(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/vine/promises.py", line 244, in throw
reraise(type(exc), exc, tb)
File "/usr/local/lib/python3.7/site-packages/vine/five.py", line 195, in reraise
raise value
Exception: <class 'sqlalchemy.exc.IntegrityError'>(('(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "pg_type_typname_nsp_index"\nDETAIL: Key (typname, typnamespace)=(taskset_id_sequence, 2200) already exists.\n',))
One workaround is to force the table creation ahead of time as was proposed by a user in the issue I linked: #4653 (comment).
I think Celery should handle this itself. A possible solution would catch IntegrityError and try again until create_all succeeds. (Perhaps with a limited number of retries and with sleeps compared to this snippet):
def prepare_models(self, engine):
from sqlalchemy.exc import IntegrityError
if not self.prepared:
while True:
try:
ResultModelBase.metadata.create_all(engine)
except IntegrityError:
continue
else:
break
self.prepared = TrueMinimally Reproducible Test Case
This example doesn't use celery at all, but shows that calling create_all in multiple processes can cause the error. It's a race condition, so you might need to try it multiple times or play around with the number of processes:
Details
Requires a local postgres, and this database must be created:
createdb racetest
from concurrent.futures import ProcessPoolExecutor, as_completed
from sqlalchemy import Column, Integer, Table, MetaData, create_engine
metadata = MetaData()
tbl1 = Table('tbl1', metadata, Column('id', Integer, primary_key=True))
def create_all(url):
engine = create_engine(url)
metadata.create_all(bind=engine)
def main():
url = 'postgresql:///racetest'
engine = create_engine(url)
# Make sure schema is empty before we start
metadata.drop_all(bind=engine)
with ProcessPoolExecutor(max_workers=50) as executor:
futures = []
for _ in range(50):
future = executor.submit(create_all, url)
futures.append(future)
for fut in as_completed(futures):
fut.result()
if __name__ == '__main__':
main()