Skip to content

Commit 391ad6b

Browse files
authored
feat: add OpenLineage support for BigQuery Create Table operators (#44783)
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
1 parent f92a845 commit 391ad6b

File tree

4 files changed

+204
-14
lines changed

4 files changed

+204
-14
lines changed

providers/src/airflow/providers/google/cloud/openlineage/utils.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@
3333
ColumnLineageDatasetFacet,
3434
DocumentationDatasetFacet,
3535
Fields,
36+
Identifier,
3637
InputField,
3738
RunFacet,
3839
SchemaDatasetFacet,
3940
SchemaDatasetFacetFields,
41+
SymlinksDatasetFacet,
4042
)
4143
from airflow.providers.google import __version__ as provider_version
44+
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
4245

4346
BIGQUERY_NAMESPACE = "bigquery"
4447
BIGQUERY_URI = "bigquery"
@@ -113,6 +116,20 @@ def get_facets_from_bq_table(table: Table) -> dict[str, BaseFacet]:
113116
if table.description:
114117
facets["documentation"] = DocumentationDatasetFacet(description=table.description)
115118

119+
if table.external_data_configuration:
120+
symlinks = set()
121+
for uri in table.external_data_configuration.source_uris:
122+
if uri.startswith("gs://"):
123+
bucket, blob = _parse_gcs_url(uri)
124+
blob = extract_ds_name_from_gcs_path(blob)
125+
symlinks.add((f"gs://{bucket}", blob))
126+
127+
facets["symlink"] = SymlinksDatasetFacet(
128+
identifiers=[
129+
Identifier(namespace=namespace, name=name, type="file")
130+
for namespace, name in sorted(symlinks)
131+
]
132+
)
116133
return facets
117134

118135

providers/src/airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,7 @@ def execute(self, context: Context) -> None:
13651365

13661366
try:
13671367
self.log.info("Creating table")
1368-
table = bq_hook.create_empty_table(
1368+
self._table = bq_hook.create_empty_table(
13691369
project_id=self.project_id,
13701370
dataset_id=self.dataset_id,
13711371
table_id=self.table_id,
@@ -1382,12 +1382,15 @@ def execute(self, context: Context) -> None:
13821382
persist_kwargs = {
13831383
"context": context,
13841384
"task_instance": self,
1385-
"project_id": table.to_api_repr()["tableReference"]["projectId"],
1386-
"dataset_id": table.to_api_repr()["tableReference"]["datasetId"],
1387-
"table_id": table.to_api_repr()["tableReference"]["tableId"],
1385+
"project_id": self._table.to_api_repr()["tableReference"]["projectId"],
1386+
"dataset_id": self._table.to_api_repr()["tableReference"]["datasetId"],
1387+
"table_id": self._table.to_api_repr()["tableReference"]["tableId"],
13881388
}
13891389
self.log.info(
1390-
"Table %s.%s.%s created successfully", table.project, table.dataset_id, table.table_id
1390+
"Table %s.%s.%s created successfully",
1391+
self._table.project,
1392+
self._table.dataset_id,
1393+
self._table.table_id,
13911394
)
13921395
except Conflict:
13931396
error_msg = f"Table {self.dataset_id}.{self.table_id} already exists."
@@ -1407,6 +1410,24 @@ def execute(self, context: Context) -> None:
14071410

14081411
BigQueryTableLink.persist(**persist_kwargs)
14091412

1413+
def get_openlineage_facets_on_complete(self, task_instance):
1414+
from airflow.providers.common.compat.openlineage.facet import Dataset
1415+
from airflow.providers.google.cloud.openlineage.utils import (
1416+
BIGQUERY_NAMESPACE,
1417+
get_facets_from_bq_table,
1418+
)
1419+
from airflow.providers.openlineage.extractors import OperatorLineage
1420+
1421+
table_info = self._table.to_api_repr()["tableReference"]
1422+
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
1423+
output_dataset = Dataset(
1424+
namespace=BIGQUERY_NAMESPACE,
1425+
name=table_id,
1426+
facets=get_facets_from_bq_table(self._table),
1427+
)
1428+
1429+
return OperatorLineage(outputs=[output_dataset])
1430+
14101431

14111432
class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
14121433
"""
@@ -1632,15 +1653,15 @@ def execute(self, context: Context) -> None:
16321653
impersonation_chain=self.impersonation_chain,
16331654
)
16341655
if self.table_resource:
1635-
table = bq_hook.create_empty_table(
1656+
self._table = bq_hook.create_empty_table(
16361657
table_resource=self.table_resource,
16371658
)
16381659
BigQueryTableLink.persist(
16391660
context=context,
16401661
task_instance=self,
1641-
dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
1642-
project_id=table.to_api_repr()["tableReference"]["projectId"],
1643-
table_id=table.to_api_repr()["tableReference"]["tableId"],
1662+
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
1663+
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
1664+
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
16441665
)
16451666
return
16461667

@@ -1691,18 +1712,36 @@ def execute(self, context: Context) -> None:
16911712
"encryptionConfiguration": self.encryption_configuration,
16921713
}
16931714

1694-
table = bq_hook.create_empty_table(
1715+
self._table = bq_hook.create_empty_table(
16951716
table_resource=table_resource,
16961717
)
16971718

16981719
BigQueryTableLink.persist(
16991720
context=context,
17001721
task_instance=self,
1701-
dataset_id=table.to_api_repr()["tableReference"]["datasetId"],
1702-
project_id=table.to_api_repr()["tableReference"]["projectId"],
1703-
table_id=table.to_api_repr()["tableReference"]["tableId"],
1722+
dataset_id=self._table.to_api_repr()["tableReference"]["datasetId"],
1723+
project_id=self._table.to_api_repr()["tableReference"]["projectId"],
1724+
table_id=self._table.to_api_repr()["tableReference"]["tableId"],
17041725
)
17051726

1727+
def get_openlineage_facets_on_complete(self, task_instance):
1728+
from airflow.providers.common.compat.openlineage.facet import Dataset
1729+
from airflow.providers.google.cloud.openlineage.utils import (
1730+
BIGQUERY_NAMESPACE,
1731+
get_facets_from_bq_table,
1732+
)
1733+
from airflow.providers.openlineage.extractors import OperatorLineage
1734+
1735+
table_info = self._table.to_api_repr()["tableReference"]
1736+
table_id = ".".join((table_info["projectId"], table_info["datasetId"], table_info["tableId"]))
1737+
output_dataset = Dataset(
1738+
namespace=BIGQUERY_NAMESPACE,
1739+
name=table_id,
1740+
facets=get_facets_from_bq_table(self._table),
1741+
)
1742+
1743+
return OperatorLineage(outputs=[output_dataset])
1744+
17061745

17071746
class BigQueryDeleteDatasetOperator(GoogleCloudBaseOperator):
17081747
"""

providers/tests/google/cloud/openlineage/test_utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
Dataset,
2828
DocumentationDatasetFacet,
2929
Fields,
30+
Identifier,
3031
InputField,
3132
SchemaDatasetFacet,
3233
SchemaDatasetFacetFields,
34+
SymlinksDatasetFacet,
3335
)
3436
from airflow.providers.google.cloud.openlineage.utils import (
3537
extract_ds_name_from_gcs_path,
@@ -49,6 +51,10 @@
4951
{"name": "field2", "type": "INTEGER"},
5052
]
5153
},
54+
"externalDataConfiguration": {
55+
"sourceFormat": "CSV",
56+
"sourceUris": ["gs://bucket/path/to/files*", "gs://second_bucket/path/to/other/files*"],
57+
},
5258
}
5359
TEST_TABLE: Table = Table.from_api_repr(TEST_TABLE_API_REPR)
5460
TEST_EMPTY_TABLE_API_REPR = {
@@ -84,6 +90,12 @@ def test_get_facets_from_bq_table():
8490
]
8591
),
8692
"documentation": DocumentationDatasetFacet(description="Table description."),
93+
"symlink": SymlinksDatasetFacet(
94+
identifiers=[
95+
Identifier(namespace="gs://bucket", name="path/to", type="file"),
96+
Identifier(namespace="gs://second_bucket", name="path/to/other", type="file"),
97+
]
98+
),
8799
}
88100
result = get_facets_from_bq_table(TEST_TABLE)
89101
assert result == expected_facets

providers/tests/google/cloud/operators/test_bigquery.py

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import pandas as pd
2828
import pytest
29-
from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter
29+
from google.cloud.bigquery import DEFAULT_RETRY, ScalarQueryParameter, Table
3030
from google.cloud.exceptions import Conflict
3131

3232
from airflow.exceptions import (
@@ -36,11 +36,17 @@
3636
TaskDeferred,
3737
)
3838
from airflow.providers.common.compat.openlineage.facet import (
39+
DocumentationDatasetFacet,
3940
ErrorMessageRunFacet,
4041
ExternalQueryRunFacet,
42+
Identifier,
4143
InputDataset,
44+
SchemaDatasetFacet,
45+
SchemaDatasetFacetFields,
4246
SQLJobFacet,
47+
SymlinksDatasetFacet,
4348
)
49+
from airflow.providers.google.cloud.openlineage.utils import BIGQUERY_NAMESPACE
4450
from airflow.providers.google.cloud.operators.bigquery import (
4551
BigQueryCheckOperator,
4652
BigQueryColumnCheckOperator,
@@ -259,6 +265,63 @@ def test_create_existing_table(self, mock_hook, caplog, if_exists, is_conflict,
259265
if log_msg is not None:
260266
assert log_msg in caplog.text
261267

268+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
269+
def test_get_openlineage_facets_on_complete(self, mock_hook):
270+
schema_fields = [
271+
{"name": "field1", "type": "STRING", "description": "field1 description"},
272+
{"name": "field2", "type": "INTEGER"},
273+
]
274+
table_resource = {
275+
"tableReference": {
276+
"projectId": TEST_GCP_PROJECT_ID,
277+
"datasetId": TEST_DATASET,
278+
"tableId": TEST_TABLE_ID,
279+
},
280+
"description": "Table description.",
281+
"schema": {"fields": schema_fields},
282+
}
283+
mock_hook.return_value.create_empty_table.return_value = Table.from_api_repr(table_resource)
284+
operator = BigQueryCreateEmptyTableOperator(
285+
task_id=TASK_ID,
286+
dataset_id=TEST_DATASET,
287+
project_id=TEST_GCP_PROJECT_ID,
288+
table_id=TEST_TABLE_ID,
289+
schema_fields=schema_fields,
290+
)
291+
operator.execute(context=MagicMock())
292+
293+
mock_hook.return_value.create_empty_table.assert_called_once_with(
294+
dataset_id=TEST_DATASET,
295+
project_id=TEST_GCP_PROJECT_ID,
296+
table_id=TEST_TABLE_ID,
297+
schema_fields=schema_fields,
298+
time_partitioning={},
299+
cluster_fields=None,
300+
labels=None,
301+
view=None,
302+
materialized_view=None,
303+
encryption_configuration=None,
304+
table_resource=None,
305+
exists_ok=False,
306+
)
307+
308+
result = operator.get_openlineage_facets_on_complete(None)
309+
assert not result.run_facets
310+
assert not result.job_facets
311+
assert not result.inputs
312+
assert len(result.outputs) == 1
313+
assert result.outputs[0].namespace == BIGQUERY_NAMESPACE
314+
assert result.outputs[0].name == f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
315+
assert result.outputs[0].facets == {
316+
"schema": SchemaDatasetFacet(
317+
fields=[
318+
SchemaDatasetFacetFields(name="field1", type="STRING", description="field1 description"),
319+
SchemaDatasetFacetFields(name="field2", type="INTEGER"),
320+
]
321+
),
322+
"documentation": DocumentationDatasetFacet(description="Table description."),
323+
}
324+
262325

263326
class TestBigQueryCreateExternalTableOperator:
264327
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
@@ -344,6 +407,65 @@ def test_execute_with_parquet_format(self, mock_hook):
344407
operator.execute(context=MagicMock())
345408
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
346409

410+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
411+
def test_get_openlineage_facets_on_complete(self, mock_hook):
412+
table_resource = {
413+
"tableReference": {
414+
"projectId": TEST_GCP_PROJECT_ID,
415+
"datasetId": TEST_DATASET,
416+
"tableId": TEST_TABLE_ID,
417+
},
418+
"description": "Table description.",
419+
"schema": {
420+
"fields": [
421+
{"name": "field1", "type": "STRING", "description": "field1 description"},
422+
{"name": "field2", "type": "INTEGER"},
423+
]
424+
},
425+
"externalDataConfiguration": {
426+
"sourceUris": [
427+
f"gs://{TEST_GCS_BUCKET}/{source_object}" for source_object in TEST_GCS_CSV_DATA
428+
],
429+
"sourceFormat": TEST_SOURCE_CSV_FORMAT,
430+
},
431+
}
432+
mock_hook.return_value.create_empty_table.return_value = Table.from_api_repr(table_resource)
433+
operator = BigQueryCreateExternalTableOperator(
434+
task_id=TASK_ID,
435+
bucket=TEST_GCS_BUCKET,
436+
source_objects=TEST_GCS_CSV_DATA,
437+
table_resource=table_resource,
438+
)
439+
440+
mock_hook.return_value.split_tablename.return_value = (
441+
TEST_GCP_PROJECT_ID,
442+
TEST_DATASET,
443+
TEST_TABLE_ID,
444+
)
445+
446+
operator.execute(context=MagicMock())
447+
mock_hook.return_value.create_empty_table.assert_called_once_with(table_resource=table_resource)
448+
449+
result = operator.get_openlineage_facets_on_complete(None)
450+
assert not result.run_facets
451+
assert not result.job_facets
452+
assert not result.inputs
453+
assert len(result.outputs) == 1
454+
assert result.outputs[0].namespace == BIGQUERY_NAMESPACE
455+
assert result.outputs[0].name == f"{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}"
456+
assert result.outputs[0].facets == {
457+
"schema": SchemaDatasetFacet(
458+
fields=[
459+
SchemaDatasetFacetFields(name="field1", type="STRING", description="field1 description"),
460+
SchemaDatasetFacetFields(name="field2", type="INTEGER"),
461+
]
462+
),
463+
"documentation": DocumentationDatasetFacet(description="Table description."),
464+
"symlink": SymlinksDatasetFacet(
465+
identifiers=[Identifier(namespace=f"gs://{TEST_GCS_BUCKET}", name="dir1", type="file")]
466+
),
467+
}
468+
347469

348470
class TestBigQueryDeleteDatasetOperator:
349471
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")

0 commit comments

Comments
 (0)