Skip to content

Unable to use mysql SSL parameters in create_engine() #6019

@alexagriffith

Description

@alexagriffith

PR for proposed fix to this issue: #6020

Checklist

  • I have verified that the issue exists against the master branch of Celery.
  • This has already been asked to the discussion group first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

94dae1b
This PR was made to address the following issue, which has resulted in the issue I am having now. #1930

Related Issues

#1930

Possible Duplicates

  • None

Environment & Settings

Celery version: celery>=4.0.0 (using it in Airflow)

Steps to Reproduce

(see Minimally Reproducible Test Case for step by step commands. This contains information leading to the issue and a proposed fix)

In Airflow, you can set celery configs. I was setting up cloudsql to use a private IP instead of a proxy. Currently, we use mysql as the results_backend. Changing the host address from local host to the private IP caused some errors, as expected.

OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.x.x.xxx' (using password: YES)")

In order to use the private IP, I need to use the SSL cert, key, and ca. I confirmed that by logging into the Airflow worker and scheduler pods that my url and engine arg params worked.

from airflow.models import DagRun 
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
e = create_engine({AIRFLOW__CELERY__SQL_ALCHEMY_CONN},connect_args= {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert',  'key': '/path-to-mysql-sslcert/client-key'}})
s = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=e))
s.query(DagRun).all()

This worked fine, so I know that the my ssl certs are accessible, the engine can be created, and a session used. Non-celery mysql connections no longer gave an error.

The Celery documentation (https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-database-result-backend) outlines how to add engine args to via database_engine_options. Therefore, I added

'database_engine_options': {
        'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}}

However, I still get the same error.

OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.x.x.xxx' (using password: YES)")

Additionally, I get logs in the scheduler like the following:

 {{__init__.py:56}} WARNING - Failed operation _get_task_meta_for.  Retrying 1 more times.
68918-Traceback (most recent call last):
68919-  File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 51, in _inner
68920-    return fun(*args, **kwargs)
68921-  File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 154, in _get_task_meta_for
68922:    session = self.ResultSession()
68923:  File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/__init__.py", line 113, in ResultSession
68924-    **self.engine_options)
68925-  File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/session.py", line 59, in session_factory
68926-    self.prepare_models(engine)
68927-  File "/usr/local/lib/python2.7/dist-packages/celery/backends/database/session.py", line 54, in prepare_models
68928-    ResultModelBase.metadata.create_all(engine)

After digging through the code with @dangermike, we noticed that get_engine will not use the kwargs passed to it unless it has been forked.(https://github.com/celery/celery/blob/master/celery/backends/database/session.py#L34) Therefore, the SSL params will not be passed in our case. The only place that self.forked = True is after the fork cleanup session. This used to not be the case (94dae1b), but after an issue was made about passing pool_size (#1930), **kwargs were taken out of create_engine() entirely.
Possibly something like the following would allow for kwargs to be passed in, while still addressing the pool params issue.

class SessionManager(object):
    # ...
    def get_engine(self, dburi, **kwargs):
        if self.forked:
            try:
                return self._engines[dburi]
            except KeyError:
                engine = self._engines[dburi] = create_engine(dburi, **kwargs)
                return engine
        else:
            kwargs = dict([(k, v) for k, v in kwargs.items() if not k.startswith('pool')])
            return create_engine(dburi, poolclass=NullPool, **kwargs)

where kwargs = dict([(k, v) for k, v in kwargs.items() if not k.startswith('pool')]) omits any pool args while keeping the rest.

Required Dependencies

  • Minimal Python Version: >=2.7
  • Minimal Celery Version: >=4.0.0
  • Minimal Kombu Version: N/A or Unknown
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

Used Airflow

Other Dependencies

N/A

Minimally Reproducible Test Case

In a python shell,

get the url with a private mysql IP to make result_backend, giving something like db+mysql://airflow:***@10.x.xx.xx/airflow

and the celery config

celery_configuration = 
{'broker_transport_options': {'visibility_timeout': 21600},
 'result_serializer': 'pickle',
 'task_acks_late': True, 
'database_engine_options': { 'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}}, 
'task_default_queue': 'default',
 'worker_concurrency': 32,
 'worker_prefetch_multiplier': 1, 
'event_serializer': 'json', 
'accept_content': ['json', 'pickle'], 
'broker_url': 'redis://{URL}/1', 
'result_backend': 'db+mysql://airflow:***@10.x.xx.xx/airflow', 
'task_default_exchange': 'default'}

the line most important here is:
'database_engine_options': { 'connect_args': {'ssl': {'ca': '/path-to-mysql-sslcert/server-ca', 'cert': '/path-to-mysql-sslcert/client-cert', 'key': '/path-to-mysql-sslcert/client-key'}}}

then try to connect to result_backend by creating app.

app = Celery(celery_app_name=airflow.executors.celery_executor,
    config_source=celery_configuration)

create a database backend

dbbe = database.DatabaseBackend(url={results_backend url without the 'db+' in the beginning}, engine_options=celery_configuration['database_engine_options'], app=app)

and you will get the error again

sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.xx.xx.xxx' (using password: YES)")
(Background on this error at: http://sqlalche.me/e/e3q8)

Expected Behavior

It seems like the expected behavior here would be for the connection to be successful and use the SSL certs in the **kwargs passed into get_engine.

Actual Behavior

Since self.fork is not True, and will not be True, create_engine is made by:

            return create_engine(dburi, poolclass=NullPool)

since the SSL certs are not included, an error is returned and the connection is not successful.

sqlalchemy.exc.OperationalError: (_mysql_exceptions.OperationalError) (1045, "Access denied for user 'airflow'@'10.xx.xx.xxx' (using password: YES)")
(Background on this error at: http://sqlalche.me/e/e3q8)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions