1818
1919import logging
2020import sys
21+ import warnings
2122from collections import defaultdict
2223from datetime import datetime
2324from operator import attrgetter
3132from elasticsearch_dsl import Search
3233
3334from airflow .configuration import conf
34- from airflow .models import TaskInstance
35+ from airflow .models .dagrun import DagRun
36+ from airflow .models .taskinstance import TaskInstance
3537from airflow .utils import timezone
3638from airflow .utils .log .file_task_handler import FileTaskHandler
3739from airflow .utils .log .json_formatter import JSONFormatter
3840from airflow .utils .log .logging_mixin import ExternalLoggingMixin , LoggingMixin
41+ from airflow .utils .session import create_session
3942
4043# Elasticsearch hosted log type
4144EsLogMsgType = List [Tuple [str , str ]]
4245
46+ # Compatibility: Airflow 2.3.3 and up uses this method, which accesses the
47+ # LogTemplate model to record the log ID template used. If this function does
48+ # not exist, the task handler should use the log_id_template attribute instead.
49+ USE_PER_RUN_LOG_ID = hasattr (DagRun , "get_log_template" )
50+
4351
4452class ElasticsearchTaskHandler (FileTaskHandler , ExternalLoggingMixin , LoggingMixin ):
4553 """
@@ -65,8 +73,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix
6573 def __init__ (
6674 self ,
6775 base_log_folder : str ,
68- filename_template : str ,
69- log_id_template : str ,
7076 end_of_log_mark : str ,
7177 write_stdout : bool ,
7278 json_format : bool ,
@@ -76,6 +82,9 @@ def __init__(
7682 host : str = "localhost:9200" ,
7783 frontend : str = "localhost:5601" ,
7884 es_kwargs : Optional [dict ] = conf .getsection ("elasticsearch_configs" ),
85+ * ,
86+ filename_template : Optional [str ] = None ,
87+ log_id_template : Optional [str ] = None ,
7988 ):
8089 """
8190 :param base_log_folder: base folder to store logs locally
@@ -88,7 +97,13 @@ def __init__(
8897
8998 self .client = elasticsearch .Elasticsearch ([host ], ** es_kwargs ) # type: ignore[attr-defined]
9099
91- self .log_id_template = log_id_template
100+ if USE_PER_RUN_LOG_ID and log_id_template is not None :
101+ warnings .warn (
102+ "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect" ,
103+ DeprecationWarning ,
104+ )
105+
106+ self .log_id_template = log_id_template # Only used on Airflow < 2.3.2.
92107 self .frontend = frontend
93108 self .mark_end_on_close = True
94109 self .end_of_log_mark = end_of_log_mark
@@ -103,7 +118,13 @@ def __init__(
103118 self .handler : Union [logging .FileHandler , logging .StreamHandler ] # type: ignore[assignment]
104119
105120 def _render_log_id (self , ti : TaskInstance , try_number : int ) -> str :
106- dag_run = ti .get_dagrun ()
121+ with create_session () as session :
122+ dag_run = ti .get_dagrun (session = session )
123+ if USE_PER_RUN_LOG_ID :
124+ log_id_template = dag_run .get_log_template (session = session ).elasticsearch_id
125+ else :
126+ log_id_template = self .log_id_template
127+
107128 dag = ti .task .dag
108129 assert dag is not None # For Mypy.
109130 try :
@@ -126,7 +147,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str:
126147 data_interval_end = ""
127148 execution_date = dag_run .execution_date .isoformat ()
128149
129- return self . log_id_template .format (
150+ return log_id_template .format (
130151 dag_id = ti .dag_id ,
131152 task_id = ti .task_id ,
132153 run_id = getattr (ti , "run_id" , "" ),
0 commit comments