|
29 | 29 | from typing import TYPE_CHECKING, Any, Callable, Iterable |
30 | 30 | from urllib.parse import urljoin |
31 | 31 |
|
| 32 | +import httpx |
32 | 33 | import pendulum |
33 | 34 |
|
34 | 35 | from airflow.configuration import conf |
@@ -78,8 +79,6 @@ def _set_task_deferred_context_var(): |
78 | 79 |
|
79 | 80 |
|
80 | 81 | def _fetch_logs_from_service(url, log_relative_path): |
81 | | - import httpx |
82 | | - |
83 | 82 | from airflow.utils.jwt_signer import JWTSigner |
84 | 83 |
|
85 | 84 | timeout = conf.getint("webserver", "log_fetch_timeout_sec", fallback=None) |
@@ -170,6 +169,9 @@ class FileTaskHandler(logging.Handler): |
170 | 169 | """ |
171 | 170 |
|
172 | 171 | trigger_should_wrap = True |
| 172 | + inherits_from_empty_operator_log_message = ( |
| 173 | + "Operator inherits from empty operator and thus does not have logs" |
| 174 | + ) |
173 | 175 |
|
174 | 176 | def __init__(self, base_log_folder: str, filename_template: str | None = None): |
175 | 177 | super().__init__() |
@@ -555,8 +557,11 @@ def _read_from_logs_server(self, ti, worker_log_rel_path) -> tuple[list[str], li |
555 | 557 | messages.append(f"Found logs served from host {url}") |
556 | 558 | logs.append(response.text) |
557 | 559 | except Exception as e: |
558 | | - messages.append(f"Could not read served logs: {e}") |
559 | | - logger.exception("Could not read served logs") |
| 560 | + if isinstance(e, httpx.UnsupportedProtocol) and ti.task.inherits_from_empty_operator is True: |
| 561 | + messages.append(self.inherits_from_empty_operator_log_message) |
| 562 | + else: |
| 563 | + messages.append(f"Could not read served logs: {e}") |
| 564 | + logger.exception("Could not read served logs") |
560 | 565 | return messages, logs |
561 | 566 |
|
562 | 567 | def _read_remote_logs(self, ti, try_number, metadata=None) -> tuple[list[str], list[str]]: |
|
0 commit comments