Skip to content

Conversation

@nailo2c
Copy link
Contributor

@nailo2c nailo2c commented Jul 6, 2025

Closes: #35012

Why

  1. Port ssl_verify_cert from pydruid to DruidDbApiHook.get_conn.
  2. get_pandas_df was implemented in Work around typing issue in examples and providers #35494.

How

druid_broker_conn = connect(
    ...
    ssl_verify_cert=conn.extra_dejson.get("ssl_verify_cert", True),
)

What

ssl_verify_cert

Create connections with following commands:

airflow connections add 'druid_ssl_true' \
  --conn-type 'druid' \
  --conn-host 'broker' \
  --conn-port '8082' \
  --conn-schema 'https' \
  --conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": true}'

airflow connections add 'druid_ssl_false' \
  --conn-type 'druid' \
  --conn-host 'broker' \
  --conn-port '8082' \
  --conn-schema 'https' \
  --conn-extra '{"endpoint": "/druid/v2/sql", "ssl_verify_cert": false}'

Create a DAG to test ssl_verify_cert:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
from datetime import datetime
from airflow.exceptions import AirflowException
import logging

def test_druid_ssl_verify(conn_id):
    hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
    try:
        conn = hook.get_conn()
        logging.info('=== conn.ssl_verify_cert ===')
        logging.info(conn.ssl_verify_cert)
        cur = conn.cursor()
        cur.execute("SELECT 1")
        result = cur.fetchall()
        logging.info("Druid query result: %s", result)
    except Exception as e:
        logging.error("Connection failed: %s", str(e))
        raise AirflowException(f"SSL test failed for connection {conn_id}: {str(e)}")
    finally:
        conn.close()

with DAG(
    dag_id="test_druid_ssl_verify",
    start_date=datetime(2025, 7, 1),
    schedule=None,
    catchup=False,
    tags=["druid", "ssl-test"],
) as dag:

    test_conn_ssl_true = PythonOperator(
        task_id="test_ssl_verify_true",
        python_callable=test_druid_ssl_verify,
        op_args=["druid_ssl_true"],
    )

    test_conn_ssl_false = PythonOperator(
        task_id="test_ssl_verify_false",
        python_callable=test_druid_ssl_verify,
        op_args=["druid_ssl_false"],
    )

    test_conn_ssl_true >> test_conn_ssl_false

Result - matches expectation:

  • ssl_verify_cert=true
druid_ssl_true
  • ssl_verify_cert=false
druid_ssl_false

get_pandas_df

Example DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.druid.hooks.druid import DruidDbApiHook
from datetime import datetime
import logging

def test_druid_get_pandas_df(conn_id: str):
    hook = DruidDbApiHook(druid_broker_conn_id=conn_id)
    try:
        df = hook.get_pandas_df("SELECT 1")
        logging.info("Druid returned dataframe:\n%s", df)
    except Exception as e:
        logging.error("Druid get_pandas_df failed: %s", e)
        raise

with DAG(
    dag_id="test_druid_get_pandas_df",
    start_date=datetime(2025, 7, 1),
    schedule=None,
    catchup=False,
    tags=["druid", "ssl", "get_pandas_df"],
) as dag:

    run_query = PythonOperator(
        task_id="run_druid_query",
        python_callable=test_druid_get_pandas_df,
        op_args=["druid_ssl_false"],
    )

Result:
druid_pd_df

@potiuk potiuk merged commit 2f155a6 into apache:main Jul 13, 2025
71 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add new functions to DruidDbApiHook

2 participants