Skip to content

Commit 123dadd

Browse files
sunank200uranusjr
andauthored
Rename execution_date to logical_date across codebase (#43902)
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
1 parent 36e267d commit 123dadd

File tree

329 files changed

+3722
-3400
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

329 files changed

+3722
-3400
lines changed

airflow/api/client/local_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ def __init__(self, auth=None, session: httpx.Client | None = None):
3636
self._session.auth = auth
3737

3838
def trigger_dag(
39-
self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True
39+
self, dag_id, run_id=None, conf=None, logical_date=None, replace_microseconds=True
4040
) -> dict | None:
4141
dag_run = trigger_dag.trigger_dag(
4242
dag_id=dag_id,
4343
triggered_by=DagRunTriggeredByType.CLI,
4444
run_id=run_id,
4545
conf=conf,
46-
execution_date=execution_date,
46+
logical_date=logical_date,
4747
replace_microseconds=replace_microseconds,
4848
)
4949
if dag_run:

airflow/api/common/mark_tasks.py

Lines changed: 61 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,19 @@ def _create_dagruns(
5959
:param dag: The DAG to create runs for.
6060
:param infos: List of logical dates and data intervals to evaluate.
6161
:param state: The state to set the dag run to
62-
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{execution_date}``.
63-
:return: Newly created and existing dag runs for the execution dates supplied.
62+
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date}``.
63+
:return: Newly created and existing dag runs for the logical dates supplied.
6464
"""
6565
# Find out existing DAG runs that we don't need to create.
6666
dag_runs = {
6767
run.logical_date: run
68-
for run in DagRun.find(dag_id=dag.dag_id, execution_date=[info.logical_date for info in infos])
68+
for run in DagRun.find(dag_id=dag.dag_id, logical_date=[info.logical_date for info in infos])
6969
}
7070

7171
for info in infos:
7272
if info.logical_date not in dag_runs:
7373
dag_runs[info.logical_date] = dag.create_dagrun(
74-
execution_date=info.logical_date,
74+
logical_date=info.logical_date,
7575
data_interval=info.data_interval,
7676
start_date=timezone.utcnow(),
7777
external_trigger=False,
@@ -87,7 +87,7 @@ def set_state(
8787
*,
8888
tasks: Collection[Operator | tuple[Operator, int]],
8989
run_id: str | None = None,
90-
execution_date: datetime | None = None,
90+
logical_date: datetime | None = None,
9191
upstream: bool = False,
9292
downstream: bool = False,
9393
future: bool = False,
@@ -107,11 +107,11 @@ def set_state(
107107
:param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
108108
``task.dag`` needs to be set
109109
:param run_id: the run_id of the dagrun to start looking from
110-
:param execution_date: the execution date from which to start looking (deprecated)
110+
:param logical_date: the logical date from which to start looking (deprecated)
111111
:param upstream: Mark all parents (upstream tasks)
112112
:param downstream: Mark all siblings (downstream tasks) of task_id
113113
:param future: Mark all future tasks on the interval of the dag up until
114-
last execution date.
114+
last logical date.
115115
:param past: Retroactively mark all tasks starting from start_date of the DAG
116116
:param state: State to which the tasks need to be set
117117
:param commit: Commit tasks to be altered to the database
@@ -121,11 +121,11 @@ def set_state(
121121
if not tasks:
122122
return []
123123

124-
if not exactly_one(execution_date, run_id):
125-
raise ValueError("Exactly one of dag_run_id and execution_date must be set")
124+
if not exactly_one(logical_date, run_id):
125+
raise ValueError("Exactly one of dag_run_id and logical_date must be set")
126126

127-
if execution_date and not timezone.is_localized(execution_date):
128-
raise ValueError(f"Received non-localized date {execution_date}")
127+
if logical_date and not timezone.is_localized(logical_date):
128+
raise ValueError(f"Received non-localized date {logical_date}")
129129

130130
task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
131131
if len(task_dags) > 1:
@@ -134,8 +134,8 @@ def set_state(
134134
if dag is None:
135135
raise ValueError("Received tasks with no DAG")
136136

137-
if execution_date:
138-
run_id = dag.get_dagrun(execution_date=execution_date, session=session).run_id
137+
if logical_date:
138+
run_id = dag.get_dagrun(logical_date=logical_date, session=session).run_id
139139
if not run_id:
140140
raise ValueError("Received tasks with no run_id")
141141

@@ -200,26 +200,26 @@ def find_task_relatives(tasks, downstream, upstream):
200200

201201

202202
@provide_session
203-
def get_execution_dates(
204-
dag: DAG, execution_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
203+
def get_logical_dates(
204+
dag: DAG, logical_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
205205
) -> list[datetime]:
206-
"""Return DAG execution dates."""
207-
latest_execution_date = dag.get_latest_execution_date(session=session)
208-
if latest_execution_date is None:
209-
raise ValueError(f"Received non-localized date {execution_date}")
210-
execution_date = timezone.coerce_datetime(execution_date)
206+
"""Return DAG logical dates."""
207+
latest_logical_date = dag.get_latest_logical_date(session=session)
208+
if latest_logical_date is None:
209+
raise ValueError(f"Received non-localized date {logical_date}")
210+
logical_date = timezone.coerce_datetime(logical_date)
211211
# determine date range of dag runs and tasks to consider
212-
end_date = latest_execution_date if future else execution_date
212+
end_date = latest_logical_date if future else logical_date
213213
if dag.start_date:
214214
start_date = dag.start_date
215215
else:
216-
start_date = execution_date
217-
start_date = execution_date if not past else start_date
216+
start_date = logical_date
217+
start_date = logical_date if not past else start_date
218218
if not dag.timetable.can_be_scheduled:
219219
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
220220
# past runs.
221221
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
222-
dates = sorted({d.execution_date for d in dag_runs})
222+
dates = sorted({d.logical_date for d in dag_runs})
223223
elif not dag.timetable.periodic:
224224
dates = [start_date]
225225
else:
@@ -235,7 +235,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
235235
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
236236
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
237237
first_dagrun = session.scalar(
238-
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date.asc()).limit(1)
238+
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
239239
)
240240

241241
if last_dagrun is None:
@@ -255,7 +255,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
255255
dates = [
256256
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
257257
]
258-
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, execution_date=dates, session=session)]
258+
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, logical_date=dates, session=session)]
259259
return run_ids
260260

261261

@@ -279,37 +279,37 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
279279
def set_dag_run_state_to_success(
280280
*,
281281
dag: DAG,
282-
execution_date: datetime | None = None,
282+
logical_date: datetime | None = None,
283283
run_id: str | None = None,
284284
commit: bool = False,
285285
session: SASession = NEW_SESSION,
286286
) -> list[TaskInstance]:
287287
"""
288288
Set the dag run's state to success.
289289
290-
Set for a specific execution date and its task instances to success.
290+
Set for a specific logical date and its task instances to success.
291291
292292
:param dag: the DAG of which to alter state
293-
:param execution_date: the execution date from which to start looking(deprecated)
293+
:param logical_date: the logical date from which to start looking(deprecated)
294294
:param run_id: the run_id to start looking from
295295
:param commit: commit DAG and tasks to be altered to the database
296296
:param session: database session
297297
:return: If commit is true, list of tasks that have been updated,
298298
otherwise list of tasks that will be updated
299-
:raises: ValueError if dag or execution_date is invalid
299+
:raises: ValueError if dag or logical_date is invalid
300300
"""
301-
if not exactly_one(execution_date, run_id):
301+
if not exactly_one(logical_date, run_id):
302302
return []
303303

304304
if not dag:
305305
return []
306306

307-
if execution_date:
308-
if not timezone.is_localized(execution_date):
309-
raise ValueError(f"Received non-localized date {execution_date}")
310-
dag_run = dag.get_dagrun(execution_date=execution_date)
307+
if logical_date:
308+
if not timezone.is_localized(logical_date):
309+
raise ValueError(f"Received non-localized date {logical_date}")
310+
dag_run = dag.get_dagrun(logical_date=logical_date)
311311
if not dag_run:
312-
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
312+
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
313313
run_id = dag_run.run_id
314314
if not run_id:
315315
raise ValueError(f"Invalid dag_run_id: {run_id}")
@@ -333,36 +333,36 @@ def set_dag_run_state_to_success(
333333
def set_dag_run_state_to_failed(
334334
*,
335335
dag: DAG,
336-
execution_date: datetime | None = None,
336+
logical_date: datetime | None = None,
337337
run_id: str | None = None,
338338
commit: bool = False,
339339
session: SASession = NEW_SESSION,
340340
) -> list[TaskInstance]:
341341
"""
342342
Set the dag run's state to failed.
343343
344-
Set for a specific execution date and its task instances to failed.
344+
Set for a specific logical date and its task instances to failed.
345345
346346
:param dag: the DAG of which to alter state
347-
:param execution_date: the execution date from which to start looking(deprecated)
347+
:param logical_date: the logical date from which to start looking(deprecated)
348348
:param run_id: the DAG run_id to start looking from
349349
:param commit: commit DAG and tasks to be altered to the database
350350
:param session: database session
351351
:return: If commit is true, list of tasks that have been updated,
352352
otherwise list of tasks that will be updated
353-
:raises: AssertionError if dag or execution_date is invalid
353+
:raises: AssertionError if dag or logical_date is invalid
354354
"""
355-
if not exactly_one(execution_date, run_id):
355+
if not exactly_one(logical_date, run_id):
356356
return []
357357
if not dag:
358358
return []
359359

360-
if execution_date:
361-
if not timezone.is_localized(execution_date):
362-
raise ValueError(f"Received non-localized date {execution_date}")
363-
dag_run = dag.get_dagrun(execution_date=execution_date)
360+
if logical_date:
361+
if not timezone.is_localized(logical_date):
362+
raise ValueError(f"Received non-localized date {logical_date}")
363+
dag_run = dag.get_dagrun(logical_date=logical_date)
364364
if not dag_run:
365-
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
365+
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
366366
run_id = dag_run.run_id
367367

368368
if not run_id:
@@ -429,16 +429,16 @@ def __set_dag_run_state_to_running_or_queued(
429429
*,
430430
new_state: DagRunState,
431431
dag: DAG,
432-
execution_date: datetime | None = None,
432+
logical_date: datetime | None = None,
433433
run_id: str | None = None,
434434
commit: bool = False,
435435
session: SASession,
436436
) -> list[TaskInstance]:
437437
"""
438-
Set the dag run for a specific execution date to running.
438+
Set the dag run for a specific logical date to running.
439439
440440
:param dag: the DAG of which to alter state
441-
:param execution_date: the execution date from which to start looking
441+
:param logical_date: the logical date from which to start looking
442442
:param run_id: the id of the DagRun
443443
:param commit: commit DAG and tasks to be altered to the database
444444
:param session: database session
@@ -447,18 +447,18 @@ def __set_dag_run_state_to_running_or_queued(
447447
"""
448448
res: list[TaskInstance] = []
449449

450-
if not exactly_one(execution_date, run_id):
450+
if not exactly_one(logical_date, run_id):
451451
return res
452452

453453
if not dag:
454454
return res
455455

456-
if execution_date:
457-
if not timezone.is_localized(execution_date):
458-
raise ValueError(f"Received non-localized date {execution_date}")
459-
dag_run = dag.get_dagrun(execution_date=execution_date)
456+
if logical_date:
457+
if not timezone.is_localized(logical_date):
458+
raise ValueError(f"Received non-localized date {logical_date}")
459+
dag_run = dag.get_dagrun(logical_date=logical_date)
460460
if not dag_run:
461-
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
461+
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
462462
run_id = dag_run.run_id
463463
if not run_id:
464464
raise ValueError(f"DagRun with run_id: {run_id} not found")
@@ -474,20 +474,20 @@ def __set_dag_run_state_to_running_or_queued(
474474
def set_dag_run_state_to_running(
475475
*,
476476
dag: DAG,
477-
execution_date: datetime | None = None,
477+
logical_date: datetime | None = None,
478478
run_id: str | None = None,
479479
commit: bool = False,
480480
session: SASession = NEW_SESSION,
481481
) -> list[TaskInstance]:
482482
"""
483483
Set the dag run's state to running.
484484
485-
Set for a specific execution date and its task instances to running.
485+
Set for a specific logical date and its task instances to running.
486486
"""
487487
return __set_dag_run_state_to_running_or_queued(
488488
new_state=DagRunState.RUNNING,
489489
dag=dag,
490-
execution_date=execution_date,
490+
logical_date=logical_date,
491491
run_id=run_id,
492492
commit=commit,
493493
session=session,
@@ -498,20 +498,20 @@ def set_dag_run_state_to_running(
498498
def set_dag_run_state_to_queued(
499499
*,
500500
dag: DAG,
501-
execution_date: datetime | None = None,
501+
logical_date: datetime | None = None,
502502
run_id: str | None = None,
503503
commit: bool = False,
504504
session: SASession = NEW_SESSION,
505505
) -> list[TaskInstance]:
506506
"""
507507
Set the dag run's state to queued.
508508
509-
Set for a specific execution date and its task instances to queued.
509+
Set for a specific logical date and its task instances to queued.
510510
"""
511511
return __set_dag_run_state_to_running_or_queued(
512512
new_state=DagRunState.QUEUED,
513513
dag=dag,
514-
execution_date=execution_date,
514+
logical_date=logical_date,
515515
run_id=run_id,
516516
commit=commit,
517517
session=session,

0 commit comments

Comments
 (0)