11,912 questions
Tooling
1
vote
0
replies
66
views
Airflow use cases for different repos
I am newbie in Airflow and I use currently task scheduler on remote desktop to automate our tasks but it is based on my credentials and I cannot share with other team members, so I clone all projects ...
4
votes
1
answer
80
views
Conditionally running tasks in Airflow
I'm trying to write a DAG that conditionally executes another task. The simplified version of what I'm working with is this:
to_be_triggered = EmptyOperator(task_id="to_be_triggered")
@task....
0
votes
0
answers
51
views
How do you add GCP billing report labels onto Airflow's BatchPredictionJobHook.create_batch_prediction_job()?
According to the official Airflow documentation we can use labels arg to mark certain batch predictions which we can use in GCP Billing > Report to filter by the labels. I tried it like this
...
...
0
votes
0
answers
46
views
OSError when running containerized task in `airflow` image
I'm trying to run a simple task in the apache/airflow image with the following Python script:
from airflow.sdk import dag, task
@task.docker(
image="docker.1ms.run/apache/airflow:3.2.0-...
0
votes
1
answer
50
views
Why does a DAG created in /dags take time to appear in the UI?
In Apache Airflow, when a new DAG file is created in the /dags directory, it doesn't show up immediately in the Airflow UI. There is some delay before the DAG becomes visible and accessible.
Why does ...
0
votes
1
answer
59
views
How to automatically install latest Python package versions in AWS MWAA from S3 requirements.txt without manual updates?
Body:
I am using AWS Managed Workflows for Apache Airflow (MWAA) where the requirements.txt file is stored in an S3 bucket and MWAA syncs it during environment updates.
Current Setup:
requirements....
0
votes
0
answers
62
views
KubernetesPodOperator with deferrable=True returns Forbidden on Cloud Composer 3
Environment:
Cloud Composer 3 (composer-3-airflow-2.10.2-build.13)
Region: europe-west1
Environment name: composer3-npd
Project: XXXXXXXXXX
Private environment: enabled
Problem: When using ...
0
votes
1
answer
73
views
Airflow task_group conditional logic always enters else block even when env is "PROD" or "STAGE"
Problem
I have an Airflow DAG that uses a @task_group with conditional logic to set task dependencies based on an environment variable. The intent is:
In PROD or STAGE: run only table_task (no ...
0
votes
0
answers
39
views
Airflow: Dynamic sequential task groups where number of groups is unknown at parse time
I have an Airflow DAG where I need to:
Fetch a list of items from an Airflow Variable
Task A will Batch them into sublists
For each batch, create a task group with dynamic task mapping (one task ...
0
votes
1
answer
73
views
Making SQL queries from a task, not SqlExecuteQueryOperator
I am having trouble writing an Apache Airflow (v. 3.1.7) DAG for the following pipeline:
Fetch rows from MS SQL database, based on data_interval
Output from (1) is sorted and processed in a Python ...
4
votes
1
answer
152
views
Airflow Task dies exactly 24h after starting
The Problem:
when a task duration reaches 24h it is immediately killed. The log message we get indicates token expiration.
System infos
Airflow 3.1.6 (running in docker)
Celery executor (...
1
vote
1
answer
81
views
Python Multiprocessing isn't working anymore after changing from Airflow 2 to Airflow 3
I am currently migrating my processes and DAGs from Airflow 2 to Airflow 3. In doing so, I am encountering the problem that the DAG that executes an XML parser freezes after completing its work. As a ...
0
votes
0
answers
65
views
Airflow: Param with None as default causes ParamValidationError
Context
render_template_as_native_obj=True
params={
"date_boundary": Param(
default=None,
type=["null", "string"],
...
0
votes
0
answers
78
views
Reparse external dag from current dag in Airflow 3.0.6
We are planning airflow upgrade from 2.7 to 3.0.6. Here we are triggering external dag it is refreshed and having the expected tasks.
with 2.6 version, there was direct access to database but 3.0.7 ...
0
votes
0
answers
41
views
KubernetesExecutor, Airflow 3,SparkSubmitOperator with pod_overwrite fails with json validation error
I'm trying to figure out how to successfully run dag with SparkSubmitOperator on Airflow 3.1.5,
I have a wrapper which sets pod config:
self.executor_config = {
"pod_override": ...