@@ -59,19 +59,19 @@ def _create_dagruns(
5959 :param dag: The DAG to create runs for.
6060 :param infos: List of logical dates and data intervals to evaluate.
6161 :param state: The state to set the dag run to
62- :param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{execution_date }``.
63- :return: Newly created and existing dag runs for the execution dates supplied.
62+ :param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date }``.
63+ :return: Newly created and existing dag runs for the logical dates supplied.
6464 """
6565 # Find out existing DAG runs that we don't need to create.
6666 dag_runs = {
6767 run .logical_date : run
68- for run in DagRun .find (dag_id = dag .dag_id , execution_date = [info .logical_date for info in infos ])
68+ for run in DagRun .find (dag_id = dag .dag_id , logical_date = [info .logical_date for info in infos ])
6969 }
7070
7171 for info in infos :
7272 if info .logical_date not in dag_runs :
7373 dag_runs [info .logical_date ] = dag .create_dagrun (
74- execution_date = info .logical_date ,
74+ logical_date = info .logical_date ,
7575 data_interval = info .data_interval ,
7676 start_date = timezone .utcnow (),
7777 external_trigger = False ,
@@ -87,7 +87,7 @@ def set_state(
8787 * ,
8888 tasks : Collection [Operator | tuple [Operator , int ]],
8989 run_id : str | None = None ,
90- execution_date : datetime | None = None ,
90+ logical_date : datetime | None = None ,
9191 upstream : bool = False ,
9292 downstream : bool = False ,
9393 future : bool = False ,
@@ -107,11 +107,11 @@ def set_state(
107107 :param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
108108 ``task.dag`` needs to be set
109109 :param run_id: the run_id of the dagrun to start looking from
110- :param execution_date : the execution date from which to start looking (deprecated)
110+ :param logical_date : the logical date from which to start looking (deprecated)
111111 :param upstream: Mark all parents (upstream tasks)
112112 :param downstream: Mark all siblings (downstream tasks) of task_id
113113 :param future: Mark all future tasks on the interval of the dag up until
114- last execution date.
114+ last logical date.
115115 :param past: Retroactively mark all tasks starting from start_date of the DAG
116116 :param state: State to which the tasks need to be set
117117 :param commit: Commit tasks to be altered to the database
@@ -121,11 +121,11 @@ def set_state(
121121 if not tasks :
122122 return []
123123
124- if not exactly_one (execution_date , run_id ):
125- raise ValueError ("Exactly one of dag_run_id and execution_date must be set" )
124+ if not exactly_one (logical_date , run_id ):
125+ raise ValueError ("Exactly one of dag_run_id and logical_date must be set" )
126126
127- if execution_date and not timezone .is_localized (execution_date ):
128- raise ValueError (f"Received non-localized date { execution_date } " )
127+ if logical_date and not timezone .is_localized (logical_date ):
128+ raise ValueError (f"Received non-localized date { logical_date } " )
129129
130130 task_dags = {task [0 ].dag if isinstance (task , tuple ) else task .dag for task in tasks }
131131 if len (task_dags ) > 1 :
@@ -134,8 +134,8 @@ def set_state(
134134 if dag is None :
135135 raise ValueError ("Received tasks with no DAG" )
136136
137- if execution_date :
138- run_id = dag .get_dagrun (execution_date = execution_date , session = session ).run_id
137+ if logical_date :
138+ run_id = dag .get_dagrun (logical_date = logical_date , session = session ).run_id
139139 if not run_id :
140140 raise ValueError ("Received tasks with no run_id" )
141141
@@ -200,26 +200,26 @@ def find_task_relatives(tasks, downstream, upstream):
200200
201201
202202@provide_session
203- def get_execution_dates (
204- dag : DAG , execution_date : datetime , future : bool , past : bool , * , session : SASession = NEW_SESSION
203+ def get_logical_dates (
204+ dag : DAG , logical_date : datetime , future : bool , past : bool , * , session : SASession = NEW_SESSION
205205) -> list [datetime ]:
206- """Return DAG execution dates."""
207- latest_execution_date = dag .get_latest_execution_date (session = session )
208- if latest_execution_date is None :
209- raise ValueError (f"Received non-localized date { execution_date } " )
210- execution_date = timezone .coerce_datetime (execution_date )
206+ """Return DAG logical dates."""
207+ latest_logical_date = dag .get_latest_logical_date (session = session )
208+ if latest_logical_date is None :
209+ raise ValueError (f"Received non-localized date { logical_date } " )
210+ logical_date = timezone .coerce_datetime (logical_date )
211211 # determine date range of dag runs and tasks to consider
212- end_date = latest_execution_date if future else execution_date
212+ end_date = latest_logical_date if future else logical_date
213213 if dag .start_date :
214214 start_date = dag .start_date
215215 else :
216- start_date = execution_date
217- start_date = execution_date if not past else start_date
216+ start_date = logical_date
217+ start_date = logical_date if not past else start_date
218218 if not dag .timetable .can_be_scheduled :
219219 # If the DAG never schedules, need to look at existing DagRun if the user wants future or
220220 # past runs.
221221 dag_runs = dag .get_dagruns_between (start_date = start_date , end_date = end_date )
222- dates = sorted ({d .execution_date for d in dag_runs })
222+ dates = sorted ({d .logical_date for d in dag_runs })
223223 elif not dag .timetable .periodic :
224224 dates = [start_date ]
225225 else :
@@ -235,7 +235,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
235235 last_dagrun = dag .get_last_dagrun (include_externally_triggered = True , session = session )
236236 current_dagrun = dag .get_dagrun (run_id = run_id , session = session )
237237 first_dagrun = session .scalar (
238- select (DagRun ).filter (DagRun .dag_id == dag .dag_id ).order_by (DagRun .execution_date .asc ()).limit (1 )
238+ select (DagRun ).filter (DagRun .dag_id == dag .dag_id ).order_by (DagRun .logical_date .asc ()).limit (1 )
239239 )
240240
241241 if last_dagrun is None :
@@ -255,7 +255,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
255255 dates = [
256256 info .logical_date for info in dag .iter_dagrun_infos_between (start_date , end_date , align = False )
257257 ]
258- run_ids = [dr .run_id for dr in DagRun .find (dag_id = dag .dag_id , execution_date = dates , session = session )]
258+ run_ids = [dr .run_id for dr in DagRun .find (dag_id = dag .dag_id , logical_date = dates , session = session )]
259259 return run_ids
260260
261261
@@ -279,37 +279,37 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
279279def set_dag_run_state_to_success (
280280 * ,
281281 dag : DAG ,
282- execution_date : datetime | None = None ,
282+ logical_date : datetime | None = None ,
283283 run_id : str | None = None ,
284284 commit : bool = False ,
285285 session : SASession = NEW_SESSION ,
286286) -> list [TaskInstance ]:
287287 """
288288 Set the dag run's state to success.
289289
290- Set for a specific execution date and its task instances to success.
290+ Set for a specific logical date and its task instances to success.
291291
292292 :param dag: the DAG of which to alter state
293- :param execution_date : the execution date from which to start looking(deprecated)
293+ :param logical_date : the logical date from which to start looking(deprecated)
294294 :param run_id: the run_id to start looking from
295295 :param commit: commit DAG and tasks to be altered to the database
296296 :param session: database session
297297 :return: If commit is true, list of tasks that have been updated,
298298 otherwise list of tasks that will be updated
299- :raises: ValueError if dag or execution_date is invalid
299+ :raises: ValueError if dag or logical_date is invalid
300300 """
301- if not exactly_one (execution_date , run_id ):
301+ if not exactly_one (logical_date , run_id ):
302302 return []
303303
304304 if not dag :
305305 return []
306306
307- if execution_date :
308- if not timezone .is_localized (execution_date ):
309- raise ValueError (f"Received non-localized date { execution_date } " )
310- dag_run = dag .get_dagrun (execution_date = execution_date )
307+ if logical_date :
308+ if not timezone .is_localized (logical_date ):
309+ raise ValueError (f"Received non-localized date { logical_date } " )
310+ dag_run = dag .get_dagrun (logical_date = logical_date )
311311 if not dag_run :
312- raise ValueError (f"DagRun with execution_date : { execution_date } not found" )
312+ raise ValueError (f"DagRun with logical_date : { logical_date } not found" )
313313 run_id = dag_run .run_id
314314 if not run_id :
315315 raise ValueError (f"Invalid dag_run_id: { run_id } " )
@@ -333,36 +333,36 @@ def set_dag_run_state_to_success(
333333def set_dag_run_state_to_failed (
334334 * ,
335335 dag : DAG ,
336- execution_date : datetime | None = None ,
336+ logical_date : datetime | None = None ,
337337 run_id : str | None = None ,
338338 commit : bool = False ,
339339 session : SASession = NEW_SESSION ,
340340) -> list [TaskInstance ]:
341341 """
342342 Set the dag run's state to failed.
343343
344- Set for a specific execution date and its task instances to failed.
344+ Set for a specific logical date and its task instances to failed.
345345
346346 :param dag: the DAG of which to alter state
347- :param execution_date : the execution date from which to start looking(deprecated)
347+ :param logical_date : the logical date from which to start looking(deprecated)
348348 :param run_id: the DAG run_id to start looking from
349349 :param commit: commit DAG and tasks to be altered to the database
350350 :param session: database session
351351 :return: If commit is true, list of tasks that have been updated,
352352 otherwise list of tasks that will be updated
353- :raises: AssertionError if dag or execution_date is invalid
353+ :raises: AssertionError if dag or logical_date is invalid
354354 """
355- if not exactly_one (execution_date , run_id ):
355+ if not exactly_one (logical_date , run_id ):
356356 return []
357357 if not dag :
358358 return []
359359
360- if execution_date :
361- if not timezone .is_localized (execution_date ):
362- raise ValueError (f"Received non-localized date { execution_date } " )
363- dag_run = dag .get_dagrun (execution_date = execution_date )
360+ if logical_date :
361+ if not timezone .is_localized (logical_date ):
362+ raise ValueError (f"Received non-localized date { logical_date } " )
363+ dag_run = dag .get_dagrun (logical_date = logical_date )
364364 if not dag_run :
365- raise ValueError (f"DagRun with execution_date : { execution_date } not found" )
365+ raise ValueError (f"DagRun with logical_date : { logical_date } not found" )
366366 run_id = dag_run .run_id
367367
368368 if not run_id :
@@ -429,16 +429,16 @@ def __set_dag_run_state_to_running_or_queued(
429429 * ,
430430 new_state : DagRunState ,
431431 dag : DAG ,
432- execution_date : datetime | None = None ,
432+ logical_date : datetime | None = None ,
433433 run_id : str | None = None ,
434434 commit : bool = False ,
435435 session : SASession ,
436436) -> list [TaskInstance ]:
437437 """
438- Set the dag run for a specific execution date to running.
438+ Set the dag run for a specific logical date to running.
439439
440440 :param dag: the DAG of which to alter state
441- :param execution_date : the execution date from which to start looking
441+ :param logical_date : the logical date from which to start looking
442442 :param run_id: the id of the DagRun
443443 :param commit: commit DAG and tasks to be altered to the database
444444 :param session: database session
@@ -447,18 +447,18 @@ def __set_dag_run_state_to_running_or_queued(
447447 """
448448 res : list [TaskInstance ] = []
449449
450- if not exactly_one (execution_date , run_id ):
450+ if not exactly_one (logical_date , run_id ):
451451 return res
452452
453453 if not dag :
454454 return res
455455
456- if execution_date :
457- if not timezone .is_localized (execution_date ):
458- raise ValueError (f"Received non-localized date { execution_date } " )
459- dag_run = dag .get_dagrun (execution_date = execution_date )
456+ if logical_date :
457+ if not timezone .is_localized (logical_date ):
458+ raise ValueError (f"Received non-localized date { logical_date } " )
459+ dag_run = dag .get_dagrun (logical_date = logical_date )
460460 if not dag_run :
461- raise ValueError (f"DagRun with execution_date : { execution_date } not found" )
461+ raise ValueError (f"DagRun with logical_date : { logical_date } not found" )
462462 run_id = dag_run .run_id
463463 if not run_id :
464464 raise ValueError (f"DagRun with run_id: { run_id } not found" )
@@ -474,20 +474,20 @@ def __set_dag_run_state_to_running_or_queued(
474474def set_dag_run_state_to_running (
475475 * ,
476476 dag : DAG ,
477- execution_date : datetime | None = None ,
477+ logical_date : datetime | None = None ,
478478 run_id : str | None = None ,
479479 commit : bool = False ,
480480 session : SASession = NEW_SESSION ,
481481) -> list [TaskInstance ]:
482482 """
483483 Set the dag run's state to running.
484484
485- Set for a specific execution date and its task instances to running.
485+ Set for a specific logical date and its task instances to running.
486486 """
487487 return __set_dag_run_state_to_running_or_queued (
488488 new_state = DagRunState .RUNNING ,
489489 dag = dag ,
490- execution_date = execution_date ,
490+ logical_date = logical_date ,
491491 run_id = run_id ,
492492 commit = commit ,
493493 session = session ,
@@ -498,20 +498,20 @@ def set_dag_run_state_to_running(
498498def set_dag_run_state_to_queued (
499499 * ,
500500 dag : DAG ,
501- execution_date : datetime | None = None ,
501+ logical_date : datetime | None = None ,
502502 run_id : str | None = None ,
503503 commit : bool = False ,
504504 session : SASession = NEW_SESSION ,
505505) -> list [TaskInstance ]:
506506 """
507507 Set the dag run's state to queued.
508508
509- Set for a specific execution date and its task instances to queued.
509+ Set for a specific logical date and its task instances to queued.
510510 """
511511 return __set_dag_run_state_to_running_or_queued (
512512 new_state = DagRunState .QUEUED ,
513513 dag = dag ,
514- execution_date = execution_date ,
514+ logical_date = logical_date ,
515515 run_id = run_id ,
516516 commit = commit ,
517517 session = session ,
0 commit comments