-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Description
PR for proposed fix to this issue: #6020
Checklist
- I have verified that the issue exists against the
masterbranch of Celery. - This has already been asked to the discussion group first.
- I have read the relevant section in the
contribution guide
on reporting bugs. - I have checked the issues list
for similar or identical bug reports. - I have checked the pull requests list
for existing proposed fixes. - I have checked the commit log
to find out if the bug was already fixed in the master branch. - I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway).
Mandatory Debugging Information
- I have included the output of
celery -A proj reportin the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
masterbranch of Celery. - I have included the contents of
pip freezein the issue. - I have included all the versions of all the external dependencies required
to reproduce this bug.
Optional Debugging Information
- I have tried reproducing the issue on more than one Python version
and/or implementation. - I have tried reproducing the issue on more than one message broker and/or
result backend. - I have tried reproducing the issue on more than one version of the message
broker and/or result backend. - I have tried reproducing the issue on more than one operating system.
- I have tried reproducing the issue on more than one workers pool.
- I have tried reproducing the issue with autoscaling, retries,
ETA/Countdown & rate limits disabled. - I have tried reproducing the issue after downgrading
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
94dae1b
This PR was made to address the following issue, which has resulted in the issue I am having now. #1930
Related Issues
Possible Duplicates
- None
Environment & Settings
Celery version: celery>=4.0.0 (using it in Airflow)
Steps to Reproduce
(see Minimally Reproducible Test Case for step by step commands. This contains information leading to the issue and a proposed fix)
In Airflow, you can set celery configs. I was setting up cloudsql to use a private IP instead of a proxy. Currently, we use mysql as the results_backend. Changing the host address from local host to the private IP caused some errors, as expected.
OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.x.x.xxx' (using password: YES)")
In order to use the private IP, I need to use the SSL cert, key, and ca. I confirmed that by logging into the Airflow worker and scheduler pods that my url and engine arg params worked.
from airflow.models import DagRun
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
e = create_engine({AIRFLOW__CELERY__SQL_ALCHEMY_CONN},connect_args= {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}})
s = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=e))
s.query(DagRun).all()
This worked fine, so I know that the my ssl certs are accessible, the engine can be created, and a session used. Non-celery mysql connections no longer gave an error.
The Celery documentation (https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-database-result-backend) outlines how to add engine args to via database_engine_options. Therefore, I added
'database_engine_options': {
'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}}
However, I still get the same error.
OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.x.x.xxx' (using password: YES)")
Additionally, I get logs in the scheduler like the following:
{{__init__.py:56}} WARNING - Failed operation _get_task_meta_for. Retrying 1 more times.
68918-Traceback (most recent call last):
68919- File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 51, in _inner
68920- return fun(*args, **kwargs)
68921- File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 154, in _get_task_meta_for
68922: session = self.ResultSession()
68923: File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 113, in ResultSession
68924- **self.engine_options)
68925- File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/session.py", line 59, in session_factory
68926- self.prepare_models(engine)
68927- File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/session.py", line 54, in prepare_models
68928- ResultModelBase.metadata.create_all(engine)
After digging through the code with @dangermike, we noticed that get_engine will not use the kwargs passed to it unless it has been forked.(https://github.com/celery/celery/blob/master/celery/backends/database/session.py#L34) Therefore, the SSL params will not be passed in our case. The only place that self.forked = True is after the fork cleanup session. This used to not be the case (94dae1b), but after an issue was made about passing pool_size (#1930), **kwargs were taken out of create_engine() entirely.
Possibly something like the following would allow for kwargs to be passed in, while still addressing the pool params issue.
class SessionManager(object):
# ...
def get_engine(self, dburi, **kwargs):
if self.forked:
try:
return self._engines[dburi]
except KeyError:
engine = self._engines[dburi] = create_engine(dburi, **kwargs)
return engine
else:
kwargs = dict([(k, v) for k, v in kwargs.items() if not k.startswith('pool')])
return create_engine(dburi, poolclass=NullPool, **kwargs)
where kwargs = dict([(k, v) for k, v in kwargs.items() if not k.startswith('pool')]) omits any pool args while keeping the rest.
Required Dependencies
- Minimal Python Version: >=2.7
- Minimal Celery Version: >=4.0.0
- Minimal Kombu Version: N/A or Unknown
- Minimal Broker Version: N/A or Unknown
- Minimal Result Backend Version: N/A or Unknown
- Minimal OS and/or Kernel Version: N/A or Unknown
- Minimal Broker Client Version: N/A or Unknown
- Minimal Result Backend Client Version: N/A or Unknown
Python Packages
Used Airflow
Other Dependencies
N/A
Minimally Reproducible Test Case
In a python shell,
get the url with a private mysql IP to make result_backend, giving something like db+mysql://airflow:***@10.x.xx.xx/airflow
and the celery config
celery_configuration =
{'broker_transport_options': {'visibility_timeout': 21600},
'result_serializer': 'pickle',
'task_acks_late': True,
'database_engine_options': { 'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}},
'task_default_queue': 'default',
'worker_concurrency': 32,
'worker_prefetch_multiplier': 1,
'event_serializer': 'json',
'accept_content': ['json', 'pickle'],
'broker_url': 'redis://{URL}/1',
'result_backend': 'db+mysql://airflow:***@10.x.xx.xx/airflow',
'task_default_exchange': 'default'}
the line most important here is:
'database_engine_options': { 'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}}
then try to connect to result_backend by creating app.
app = Celery(celery_app_name=airflow.executors.celery_executor,
config_source=celery_configuration)
create a database backend
dbbe = database.DatabaseBackend(url={results_backend url without the 'db+' in the beginning}, engine_options=celery_configuration['database_engine_options'], app=app)
and you will get the error again
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.xx.xx.xxx' (using password: YES)")
(Background on this error at: http://sqlalche.me/e/e3q8)
Expected Behavior
It seems like the expected behavior here would be for the connection to be successful and use the SSL certs in the **kwargs passed into get_engine.
Actual Behavior
Since self.fork is not True, and will not be True, create_engine is made by:
return create_engine(dburi, poolclass=NullPool)
since the SSL certs are not included, an error is returned and the connection is not successful.
sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.xx.xx.xxx' (using password: YES)")
(Background on this error at: http://sqlalche.me/e/e3q8)