Skip to content

Commit cdb3f25

Browse files
authored
All classes in backport providers are now importable in Airflow 1.10 (#8991)
* All classes in backport providers are now importable in Airflow 1.10 * fixup! All classes in backport providers are now importable in Airflow 1.10 * fixup! fixup! All classes in backport providers are now importable in Airflow 1.10
1 parent 14fb585 commit cdb3f25

22 files changed

+1090
-330
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ metastore_browser/templates/.*\\.html$|.*\\.jinja2"
256256
^airflow/operators/.*$|
257257
^airflow/sensors/.*$|
258258
^airflow/providers/.*$|
259-
^airflow/contrib/.*$
259+
^airflow/contrib/.*$|
260+
^backport_packages/.*$
260261
- id: base-operator
261262
language: pygrep
262263
name: Make sure BaseOperator[Link] is imported from airflow.models outside of core

airflow/config_templates/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@
672672
version_added: ~
673673
type: string
674674
example: ~
675-
default: "Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}"
675+
default: ~
676676
- name: webserver
677677
description: ~
678678
options:

airflow/config_templates/default_airflow.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ default_hive_mapred_queue =
337337

338338
# Template for mapred_job_name in HiveOperator, supports the following named parameters
339339
# hostname, dag_id, task_id, execution_date
340-
mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}
340+
# mapred_job_name_template =
341341

342342
[webserver]
343343
# The base url of your website as airflow cannot guess what domain or

airflow/config_templates/default_test.cfg

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ default_owner = airflow
6666

6767
[hive]
6868
default_hive_mapred_queue = airflow
69-
mapred_job_name_template = Airflow HiveOperator task for {{hostname}}.{{dag_id}}.{{task_id}}.{{execution_date}}
7069

7170
[webserver]
7271
base_url = http://localhost:8080

airflow/providers/apache/hive/operators/hive.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,9 @@ def __init__(
9595
self.mapred_queue = mapred_queue
9696
self.mapred_queue_priority = mapred_queue_priority
9797
self.mapred_job_name = mapred_job_name
98-
self.mapred_job_name_template = conf.get('hive',
99-
'mapred_job_name_template')
98+
self.mapred_job_name_template = conf.get(
99+
'hive', 'mapred_job_name_template',
100+
fallback="Airflow HiveOperator task for {hostname}.{dag_id}.{task_id}.{execution_date}")
100101

101102
# assigned lazily - just for consistency we can create the attribute with a
102103
# `None` initial value, later it will be populated by the execute method.

airflow/providers/google/cloud/example_dags/example_datacatalog.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from google.cloud.datacatalog_v1beta1.proto.tags_pb2 import FieldType, TagField, TagTemplateField
2323

2424
from airflow import models
25-
from airflow.models.baseoperator import chain
2625
from airflow.operators.bash_operator import BashOperator
2726
from airflow.providers.google.cloud.operators.datacatalog import (
2827
CloudDataCatalogCreateEntryGroupOperator, CloudDataCatalogCreateEntryOperator,
@@ -38,6 +37,7 @@
3837
CloudDataCatalogUpdateTagTemplateOperator,
3938
)
4039
from airflow.utils.dates import days_ago
40+
from airflow.utils.helpers import chain
4141

4242
default_args = {"start_date": days_ago(1)}
4343

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!--
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
## Additional notes
21+
22+
Papermill operator is the only one that work with AUTO inlets for now (for lineage support).
23+
However since AUTO inlets is a feature of Airflow 2 and is not bacported,
24+
when back-porting to 1.10 AUTO inlets are not supported.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env python3
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
import importlib
20+
import os
21+
import sys
22+
import traceback
23+
from inspect import isclass
24+
from typing import List
25+
26+
27+
def import_all_provider_classes(source_path: str,
28+
provider_ids: List[str] = None,
29+
print_imports: bool = False) -> List[str]:
30+
"""
31+
Imports all classes in providers packages. This method loads and imports
32+
all the classes found in providers, so that we can find all the subclasses
33+
of operators/sensors etc.
34+
35+
:param provider_ids - provider ids that should be loaded.
36+
:param print_imports - if imported class should also be printed in output
37+
:param source_path: path to look for sources - might be None to look for all packages in all source paths
38+
:return: list of all imported classes
39+
"""
40+
if provider_ids:
41+
prefixed_provider_paths = [source_path + "/airflow/providers/" + provider_id.replace(".", "/")
42+
for provider_id in provider_ids]
43+
else:
44+
prefixed_provider_paths = [source_path + "/airflow/providers/"]
45+
46+
imported_classes = []
47+
tracebacks = []
48+
for root, dirs, files in os.walk(source_path):
49+
if all([not root.startswith(prefix_provider_path)
50+
for prefix_provider_path in prefixed_provider_paths]) or root.endswith("__pycache__"):
51+
# Skip loading module if it is not in the list of providers that we are looking for
52+
continue
53+
package_name = root[len(source_path) + 1:].replace("/", ".")
54+
for file in files:
55+
if file.endswith(".py"):
56+
module_name = package_name + "." + file[:-3] if file != "__init__.py" else package_name
57+
if print_imports:
58+
print(f"Importing module: {module_name}")
59+
# noinspection PyBroadException
60+
try:
61+
_module = importlib.import_module(module_name)
62+
for attribute_name in dir(_module):
63+
class_name = module_name + "." + attribute_name
64+
attribute = getattr(_module, attribute_name)
65+
if isclass(attribute):
66+
if print_imports:
67+
print(f"Imported {class_name}")
68+
imported_classes.append(class_name)
69+
except Exception:
70+
exception_str = traceback.format_exc()
71+
tracebacks.append(exception_str)
72+
if tracebacks:
73+
print("""
74+
ERROR: There were some import errors
75+
""", file=sys.stderr)
76+
for trace in tracebacks:
77+
print("----------------------------------------", file=sys.stderr)
78+
print(trace, file=sys.stderr)
79+
print("----------------------------------------", file=sys.stderr)
80+
sys.exit(1)
81+
else:
82+
return imported_classes
83+
84+
85+
if __name__ == '__main__':
86+
install_source_path = None
87+
for python_path_candidate in sys.path:
88+
providers_path_candidate = os.path.join(python_path_candidate, "airflow", "providers")
89+
if os.path.isdir(providers_path_candidate):
90+
install_source_path = python_path_candidate
91+
print()
92+
print(f"Walking all paths in {install_source_path}")
93+
print()
94+
import_all_provider_classes(print_imports=True, source_path=install_source_path)
95+
print()
96+
print("SUCCESS: All backport packages are importable!")
97+
print()

0 commit comments

Comments
 (0)