Skip to content

Commit da485da

Browse files
Add support for BeamGoPipelineOperator (#20386)
closes: #20283 In this PR: - [x] Upgrade the minimum package requirement to 2.33.0 for apache-beam (first stable for beam go sdk) - [x] Refactor `operators/beam.py` with an abstract `BeamBasePipelineOperator` class to factorize initialization and common code, also fixed mypy hook on ``BeamDataflowMixin`` - [x] Add `BeamRunGoPipelineOperator` and `BeamHook.start_go_pipeline` (+tests) - [x] Add `utils/go_module.py` to handle initialisation and dependency installation for a module. (+ tests) - [x] Slightly modified `process_util` + tests to be able to handle an extra optional parameter `cwd`. (This way we can move to the module directory to build it) - [x] Write docs
1 parent cca2f94 commit da485da

File tree

17 files changed

+771
-163
lines changed

17 files changed

+771
-163
lines changed

airflow/providers/apache/beam/example_dags/example_beam.py

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
from airflow import models
2727
from airflow.providers.apache.beam.operators.beam import (
28+
BeamRunGoPipelineOperator,
2829
BeamRunJavaPipelineOperator,
2930
BeamRunPythonPipelineOperator,
3031
)
@@ -43,7 +44,10 @@
4344
GCS_PYTHON_DATAFLOW_ASYNC = os.environ.get(
4445
'APACHE_BEAM_PYTHON_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.py'
4546
)
46-
47+
GCS_GO = os.environ.get('APACHE_BEAM_GO', 'gs://INVALID BUCKET NAME/wordcount_debugging.go')
48+
GCS_GO_DATAFLOW_ASYNC = os.environ.get(
49+
'APACHE_BEAM_GO_DATAFLOW_ASYNC', 'gs://INVALID BUCKET NAME/wordcount_debugging.go'
50+
)
4751
GCS_JAR_DIRECT_RUNNER = os.environ.get(
4852
'APACHE_BEAM_DIRECT_RUNNER_JAR',
4953
'gs://INVALID BUCKET NAME/tests/dataflow-templates-bundled-java=11-beam-v2.25.0-DirectRunner.jar',
@@ -323,3 +327,111 @@
323327

324328
start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
325329
# [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
330+
331+
332+
with models.DAG(
333+
"example_beam_native_go",
334+
start_date=START_DATE,
335+
schedule_interval="@once",
336+
catchup=False,
337+
default_args=DEFAULT_ARGS,
338+
tags=['example'],
339+
) as dag_native_go:
340+
341+
# [START howto_operator_start_go_direct_runner_pipeline_local_file]
342+
start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
343+
task_id="start_go_pipeline_local_direct_runner",
344+
go_file='files/apache_beam/examples/wordcount.go',
345+
)
346+
# [END howto_operator_start_go_direct_runner_pipeline_local_file]
347+
348+
# [START howto_operator_start_go_direct_runner_pipeline_gcs_file]
349+
start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
350+
task_id="start_go_pipeline_direct_runner",
351+
go_file=GCS_GO,
352+
pipeline_options={"output": GCS_OUTPUT},
353+
)
354+
# [END howto_operator_start_go_direct_runner_pipeline_gcs_file]
355+
356+
# [START howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
357+
start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
358+
task_id="start_go_pipeline_dataflow_runner",
359+
runner="DataflowRunner",
360+
go_file=GCS_GO,
361+
pipeline_options={
362+
'tempLocation': GCS_TMP,
363+
'stagingLocation': GCS_STAGING,
364+
'output': GCS_OUTPUT,
365+
'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
366+
},
367+
dataflow_config=DataflowConfiguration(
368+
job_name='{{task.task_id}}', project_id=GCP_PROJECT_ID, location="us-central1"
369+
),
370+
)
371+
# [END howto_operator_start_go_dataflow_runner_pipeline_gcs_file]
372+
373+
start_go_pipeline_local_spark_runner = BeamRunGoPipelineOperator(
374+
task_id="start_go_pipeline_local_spark_runner",
375+
go_file='/files/apache_beam/examples/wordcount.go',
376+
runner="SparkRunner",
377+
pipeline_options={
378+
'endpoint': '/your/spark/endpoint',
379+
},
380+
)
381+
382+
start_go_pipeline_local_flink_runner = BeamRunGoPipelineOperator(
383+
task_id="start_go_pipeline_local_flink_runner",
384+
go_file='/files/apache_beam/examples/wordcount.go',
385+
runner="FlinkRunner",
386+
pipeline_options={
387+
'output': '/tmp/start_go_pipeline_local_flink_runner',
388+
},
389+
)
390+
391+
(
392+
[
393+
start_go_pipeline_local_direct_runner,
394+
start_go_pipeline_direct_runner,
395+
]
396+
>> start_go_pipeline_local_flink_runner
397+
>> start_go_pipeline_local_spark_runner
398+
)
399+
400+
401+
with models.DAG(
402+
"example_beam_native_go_dataflow_async",
403+
default_args=DEFAULT_ARGS,
404+
start_date=START_DATE,
405+
schedule_interval="@once",
406+
catchup=False,
407+
tags=['example'],
408+
) as dag_native_go_dataflow_async:
409+
# [START howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]
410+
start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
411+
task_id="start_go_job_dataflow_runner_async",
412+
runner="DataflowRunner",
413+
go_file=GCS_GO_DATAFLOW_ASYNC,
414+
pipeline_options={
415+
'tempLocation': GCS_TMP,
416+
'stagingLocation': GCS_STAGING,
417+
'output': GCS_OUTPUT,
418+
'WorkerHarnessContainerImage': "apache/beam_go_sdk:latest",
419+
},
420+
dataflow_config=DataflowConfiguration(
421+
job_name='{{task.task_id}}',
422+
project_id=GCP_PROJECT_ID,
423+
location="us-central1",
424+
wait_until_finished=False,
425+
),
426+
)
427+
428+
wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
429+
task_id="wait-for-go-job-async-done",
430+
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
431+
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
432+
project_id=GCP_PROJECT_ID,
433+
location='us-central1',
434+
)
435+
436+
start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
437+
# [END howto_operator_start_go_dataflow_runner_pipeline_async_gcs_file]

airflow/providers/apache/beam/hooks/beam.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# under the License.
1818
"""This module contains a Apache Beam Hook."""
1919
import json
20+
import os
2021
import select
2122
import shlex
2223
import subprocess
@@ -26,6 +27,7 @@
2627

2728
from airflow.exceptions import AirflowException
2829
from airflow.hooks.base import BaseHook
30+
from airflow.providers.google.go_module_utils import init_module, install_dependencies
2931
from airflow.utils.log.logging_mixin import LoggingMixin
3032
from airflow.utils.python_virtualenv import prepare_virtualenv
3133

@@ -80,12 +82,14 @@ class BeamCommandRunner(LoggingMixin):
8082
:param cmd: Parts of the command to be run in subprocess
8183
:param process_line_callback: Optional callback which can be used to process
8284
stdout and stderr to detect job id
85+
:param working_directory: Working directory
8386
"""
8487

8588
def __init__(
8689
self,
8790
cmd: List[str],
8891
process_line_callback: Optional[Callable[[str], None]] = None,
92+
working_directory: Optional[str] = None,
8993
) -> None:
9094
super().__init__()
9195
self.log.info("Running command: %s", " ".join(shlex.quote(c) for c in cmd))
@@ -94,6 +98,7 @@ def __init__(
9498

9599
self._proc = subprocess.Popen(
96100
cmd,
101+
cwd=working_directory,
97102
shell=False,
98103
stdout=subprocess.PIPE,
99104
stderr=subprocess.PIPE,
@@ -169,6 +174,7 @@ def _start_pipeline(
169174
variables: dict,
170175
command_prefix: List[str],
171176
process_line_callback: Optional[Callable[[str], None]] = None,
177+
working_directory: Optional[str] = None,
172178
) -> None:
173179
cmd = command_prefix + [
174180
f"--runner={self.runner}",
@@ -178,6 +184,7 @@ def _start_pipeline(
178184
cmd_runner = BeamCommandRunner(
179185
cmd=cmd,
180186
process_line_callback=process_line_callback,
187+
working_directory=working_directory,
181188
)
182189
cmd_runner.wait_for_done()
183190

@@ -195,6 +202,7 @@ def start_python_pipeline(
195202
Starts Apache Beam python pipeline.
196203
197204
:param variables: Variables passed to the pipeline.
205+
:param py_file: Path to the python file to execute.
198206
:param py_options: Additional options.
199207
:param py_interpreter: Python version of the Apache Beam pipeline.
200208
If None, this defaults to the python3.
@@ -210,7 +218,8 @@ def start_python_pipeline(
210218
See virtualenv documentation for more information.
211219
212220
This option is only relevant if the ``py_requirements`` parameter is not None.
213-
:param on_new_job_id_callback: Callback called when the job ID is known.
221+
:param process_line_callback: (optional) Callback that can be used to process each line of
222+
the stdout and stderr file descriptors.
214223
"""
215224
if "labels" in variables:
216225
variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()]
@@ -265,6 +274,8 @@ def start_java_pipeline(
265274
:param variables: Variables passed to the job.
266275
:param jar: Name of the jar for the pipeline
267276
:param job_class: Name of the java class for the pipeline.
277+
:param process_line_callback: (optional) Callback that can be used to process each line of
278+
the stdout and stderr file descriptors.
268279
"""
269280
if "labels" in variables:
270281
variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))
@@ -275,3 +286,41 @@ def start_java_pipeline(
275286
command_prefix=command_prefix,
276287
process_line_callback=process_line_callback,
277288
)
289+
290+
def start_go_pipeline(
291+
self,
292+
variables: dict,
293+
go_file: str,
294+
process_line_callback: Optional[Callable[[str], None]] = None,
295+
should_init_module: bool = False,
296+
) -> None:
297+
"""
298+
Starts Apache Beam Go pipeline.
299+
300+
:param variables: Variables passed to the job.
301+
:param go_file: Path to the Go file with your beam pipeline.
302+
:param go_file:
303+
:param process_line_callback: (optional) Callback that can be used to process each line of
304+
the stdout and stderr file descriptors.
305+
:param should_init_module: If False (default), will just execute a `go run` command. If True, will
306+
init a module and dependencies with a ``go mod init`` and ``go mod tidy``, useful when pulling
307+
source with GCSHook.
308+
:return:
309+
"""
310+
if "labels" in variables:
311+
variables["labels"] = json.dumps(variables["labels"], separators=(",", ":"))
312+
313+
working_directory = os.path.dirname(go_file)
314+
basename = os.path.basename(go_file)
315+
316+
if should_init_module:
317+
init_module("main", working_directory)
318+
install_dependencies(working_directory)
319+
320+
command_prefix = ["go", "run", basename]
321+
self._start_pipeline(
322+
variables=variables,
323+
command_prefix=command_prefix,
324+
process_line_callback=process_line_callback,
325+
working_directory=working_directory,
326+
)

0 commit comments

Comments
 (0)