Skip to content

Commit 04e2fbd

Browse files
authored
AIP-58: Add Airflow ObjectStore (AFS) (#34729)
This adds the ObjectStorage and ObjectStorePath APIs per AIP-58. ObjectStorePath is a pathlib.Pathlib like interface for objects residing on object storage.
1 parent 85f0ef3 commit 04e2fbd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+2876
-68
lines changed

.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ body:
4848
- celery
4949
- cloudant
5050
- cncf-kubernetes
51+
- common-io
5152
- common-sql
5253
- daskexecutor
5354
- databricks

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,7 @@ jobs:
830830
# pip download --no-deps --dest dist apache-airflow-providers-<PROVIDER>==3.1.0
831831
#
832832
rm -vf dist/apache_airflow_providers_openlineage*.whl
833+
rm -rf dist/apache_airflow_providers_common_io*.whl
833834
- name: "Get all provider extras as AIRFLOW_EXTRAS env variable"
834835
# Extras might be different on S3 so rather than relying on "all" we should get the list of
835836
# packages to be installed from the current provider_dependencies.json file

CONTRIBUTING.rst

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -671,15 +671,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
671671
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
672672
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
673673
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
674-
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
675-
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
676-
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
677-
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
678-
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
679-
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
680-
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
681-
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
682-
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
674+
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
675+
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
676+
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
677+
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
678+
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
679+
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
680+
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
681+
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
682+
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
683+
webhdfs, winrm, yandex, zendesk
683684
.. END EXTRAS HERE
684685
685686
Provider packages

Dockerfile.ci

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ if [[ ${UPGRADE_BOTO=} == "true" ]]; then
10071007
echo
10081008
echo "${COLOR_BLUE}Upgrading boto3, botocore to latest version to run Amazon tests with them${COLOR_RESET}"
10091009
echo
1010-
pip uninstall --root-user-action ignore aiobotocore -y || true
1010+
pip uninstall --root-user-action ignore aiobotocore s3fs -y || true
10111011
pip install --root-user-action ignore --upgrade boto3 botocore
10121012
pip check
10131013
fi
@@ -1468,7 +1468,7 @@ RUN echo "Airflow version: ${AIRFLOW_VERSION}"
14681468
# Without grpcio-status limit, pip gets into very long backtracking
14691469
# We should attempt to remove it in the future
14701470
#
1471-
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0"
1471+
ARG EAGER_UPGRADE_ADDITIONAL_REQUIREMENTS="grpcio-status>=1.55.0 aiobotocore>=2.7.0"
14721472
ARG UPGRADE_TO_NEWER_DEPENDENCIES="false"
14731473
ARG VERSION_SUFFIX_FOR_PYPI=""
14741474

INSTALL

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ aiobotocore, airbyte, alibaba, all, all_dbs, amazon, apache.atlas, apache.beam,
9898
apache.drill, apache.druid, apache.flink, apache.hdfs, apache.hive, apache.impala, apache.kafka,
9999
apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop, apache.webhdfs,
100100
apprise, arangodb, asana, async, atlas, atlassian.jira, aws, azure, cassandra, celery, cgroups,
101-
cloudant, cncf.kubernetes, common.sql, crypto, dask, daskexecutor, databricks, datadog, dbt.cloud,
102-
deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc, doc_gen, docker,
103-
druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github, github_enterprise, google,
104-
google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc, jenkins, kerberos, kubernetes,
105-
ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp, microsoft.winrm, mongo, mssql,
106-
mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie, oracle, otel, pagerduty, pandas,
107-
papermill, password, pinot, plexus, postgres, presto, rabbitmq, redis, s3, salesforce, samba,
108-
segment, sendgrid, sentry, sftp, singularity, slack, smtp, snowflake, spark, sqlite, ssh, statsd,
109-
tableau, tabular, telegram, trino, vertica, virtualenv, webhdfs, winrm, yandex, zendesk
101+
cloudant, cncf.kubernetes, common.io, common.sql, crypto, dask, daskexecutor, databricks, datadog,
102+
dbt.cloud, deprecated_api, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
103+
doc_gen, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github,
104+
github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, influxdb, jdbc,
105+
jenkins, kerberos, kubernetes, ldap, leveldb, microsoft.azure, microsoft.mssql, microsoft.psrp,
106+
microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas, openlineage, opensearch, opsgenie,
107+
oracle, otel, pagerduty, pandas, papermill, password, pinot, plexus, postgres, presto, rabbitmq,
108+
redis, s3, s3fs, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack, smtp,
109+
snowflake, spark, sqlite, ssh, statsd, tableau, tabular, telegram, trino, vertica, virtualenv,
110+
webhdfs, winrm, yandex, zendesk
110111
# END EXTRAS HERE
111112

112113
# For installing Airflow in development environments - see CONTRIBUTING.rst
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
from __future__ import annotations
19+
20+
# [START tutorial]
21+
# [START import_module]
22+
import pendulum
23+
import requests
24+
25+
from airflow.decorators import dag, task
26+
from airflow.io.store.path import ObjectStoragePath
27+
28+
# [END import_module]
29+
30+
API = "https://opendata.fmi.fi/timeseries"
31+
32+
aq_fields = {
33+
"fmisid": "int32",
34+
"time": "datetime64[ns]",
35+
"AQINDEX_PT1H_avg": "float64",
36+
"PM10_PT1H_avg": "float64",
37+
"PM25_PT1H_avg": "float64",
38+
"O3_PT1H_avg": "float64",
39+
"CO_PT1H_avg": "float64",
40+
"SO2_PT1H_avg": "float64",
41+
"NO2_PT1H_avg": "float64",
42+
"TRSC_PT1H_avg": "float64",
43+
}
44+
45+
# [START create_object_storage_path]
46+
base = ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")
47+
# [END create_object_storage_path]
48+
49+
50+
# [START instantiate_dag]
51+
@dag(
52+
schedule=None,
53+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
54+
catchup=False,
55+
tags=["example"],
56+
)
57+
def tutorial_objectstorage():
58+
"""
59+
### Object Storage Tutorial Documentation
60+
This is a tutorial DAG to showcase the usage of the Object Storage API.
61+
Documentation that goes along with the Airflow Object Storage tutorial is
62+
located
63+
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/objectstorage.html)
64+
"""
65+
# [END instantiate_dag]
66+
import duckdb
67+
import pandas as pd
68+
69+
# [START get_air_quality_data]
70+
@task
71+
def get_air_quality_data(**kwargs) -> ObjectStoragePath:
72+
"""
73+
#### Get Air Quality Data
74+
This task gets air quality data from the Finnish Meteorological Institute's
75+
open data API. The data is saved as parquet.
76+
"""
77+
execution_date = kwargs["logical_date"]
78+
start_time = kwargs["data_interval_start"]
79+
80+
params = {
81+
"format": "json",
82+
"precision": "double",
83+
"groupareas": "0",
84+
"producer": "airquality_urban",
85+
"area": "Uusimaa",
86+
"param": ",".join(aq_fields.keys()),
87+
"starttime": start_time.isoformat(timespec="seconds"),
88+
"endtime": execution_date.isoformat(timespec="seconds"),
89+
"tz": "UTC",
90+
}
91+
92+
response = requests.get(API, params=params)
93+
response.raise_for_status()
94+
95+
# ensure the bucket exists
96+
base.mkdir(exists_ok=True)
97+
98+
formatted_date = execution_date.format("YYYYMMDD")
99+
path = base / f"air_quality_{formatted_date}.parquet"
100+
101+
df = pd.DataFrame(response.json()).astype(aq_fields)
102+
with path.open("wb") as file:
103+
df.to_parquet(file)
104+
105+
return path
106+
107+
# [END get_air_quality_data]
108+
109+
# [START analyze]
110+
@task
111+
def analyze(path: ObjectStoragePath, **kwargs):
112+
"""
113+
#### Analyze
114+
This task analyzes the air quality data, prints the results
115+
"""
116+
conn = duckdb.connect(database=":memory:")
117+
conn.register_filesystem(path.fs)
118+
conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")
119+
120+
df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()
121+
122+
print(df2.head())
123+
124+
# [END analyze]
125+
126+
# [START main_flow]
127+
obj_path = get_air_quality_data()
128+
analyze(obj_path)
129+
# [END main_flow]
130+
131+
132+
# [START dag_invocation]
133+
tutorial_objectstorage()
134+
# [END dag_invocation]
135+
# [END tutorial]

airflow/io/__init__.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
import logging
20+
from typing import (
21+
TYPE_CHECKING,
22+
Callable,
23+
)
24+
25+
from fsspec.implementations.local import LocalFileSystem
26+
27+
from airflow.compat.functools import cache
28+
from airflow.providers_manager import ProvidersManager
29+
from airflow.stats import Stats
30+
from airflow.utils.module_loading import import_string
31+
32+
if TYPE_CHECKING:
33+
from fsspec import AbstractFileSystem
34+
35+
log = logging.getLogger(__name__)
36+
37+
38+
def _file(_: str | None) -> LocalFileSystem:
39+
return LocalFileSystem()
40+
41+
42+
# builtin supported filesystems
43+
_BUILTIN_SCHEME_TO_FS: dict[str, Callable[[str | None], AbstractFileSystem]] = {
44+
"file": _file,
45+
}
46+
47+
48+
@cache
49+
def _register_filesystems() -> dict[str, Callable[[str | None], AbstractFileSystem]]:
50+
scheme_to_fs = _BUILTIN_SCHEME_TO_FS.copy()
51+
with Stats.timer("airflow.io.load_filesystems") as timer:
52+
manager = ProvidersManager()
53+
for fs_module_name in manager.filesystem_module_names:
54+
fs_module = import_string(fs_module_name)
55+
for scheme in getattr(fs_module, "schemes", []):
56+
if scheme in scheme_to_fs:
57+
log.warning("Overriding scheme %s for %s", scheme, fs_module_name)
58+
59+
method = getattr(fs_module, "get_fs", None)
60+
if method is None:
61+
raise ImportError(f"Filesystem {fs_module_name} does not have a get_fs method")
62+
scheme_to_fs[scheme] = method
63+
64+
log.debug("loading filesystems from providers took %.3f seconds", timer.duration)
65+
return scheme_to_fs
66+
67+
68+
def get_fs(scheme: str, conn_id: str | None = None) -> AbstractFileSystem:
69+
"""
70+
Get a filesystem by scheme.
71+
72+
:param scheme: the scheme to get the filesystem for
73+
:return: the filesystem method
74+
:param conn_id: the airflow connection id to use
75+
"""
76+
filesystems = _register_filesystems()
77+
try:
78+
return filesystems[scheme](conn_id)
79+
except KeyError:
80+
raise ValueError(f"No filesystem registered for scheme {scheme}")
81+
82+
83+
def has_fs(scheme: str) -> bool:
84+
"""
85+
Check if a filesystem is available for a scheme.
86+
87+
:param scheme: the scheme to check
88+
:return: True if a filesystem is available for the scheme
89+
"""
90+
return scheme in _register_filesystems()

0 commit comments

Comments
 (0)