Entry point for Flink REST API using external SDK driver program.#27
Entry point for Flink REST API using external SDK driver program.#27tweise merged 10 commits intorelease-2.16.0-lyftfrom
Conversation
| return getExecutionEnvironment().execute(jobName); | ||
| try { | ||
| return getExecutionEnvironment().execute(jobName); | ||
| } catch (OptimizerPlanEnvironment.ProgramAbortException ex) { |
There was a problem hiding this comment.
this needs to move to FlinkPipelineRunner
| return getExecutionEnvironment().execute(jobName); | ||
| try { | ||
| return getExecutionEnvironment().execute(jobName); | ||
| } catch (OptimizerPlanEnvironment.ProgramAbortException ex) { |
runners/flink/src/main/java/org/apache/beam/runners/flink/LyftFlinkPipelineRunner.java
Outdated
Show resolved
Hide resolved
|
|
||
| // TODO: check for job service ready | ||
| Thread.sleep(5000); | ||
| success = true; |
There was a problem hiding this comment.
I think we could use something like the option retrieval to check for the lifeness of the JobService.
There was a problem hiding this comment.
Otherwise I would remove these two lines here and just process the stdout below.
| throw new IllegalStateException("Job service thread is not alive"); | ||
| } | ||
| } finally { | ||
| System.setErr(oldErr); |
There was a problem hiding this comment.
The thread needs to be cleaned up here in case of errors which could be caught done in the stack and would let the thread lingering.
There was a problem hiding this comment.
Maybe we could also use the ProcessManager here?
|
|
||
| @VisibleForTesting | ||
| Process getUnderlyingProcess() { | ||
| public Process getUnderlyingProcess() { |
There was a problem hiding this comment.
This leaks the ProcessManager. Could we instead add the required methods for this here?
|
|
||
| private void runDriverProgram() throws Exception { | ||
| ProcessManager processManager = ProcessManager.create(); | ||
| String executable = "bash"; |
| ImmutableList.of("-c", String.format("exec %s " + DRIVER_CMD_FLAGS, driverCmd, jobPort)); | ||
| String processId = "client1"; | ||
|
|
||
| Duration timeout = Duration.ofSeconds(30); |
|
|
||
| private final String driverCmd; | ||
| private FlinkJobServerDriver driver = null; | ||
| private Thread driverThread = null; |
There was a problem hiding this comment.
| private Thread driverThread = null; | |
| private Thread driverThread; |
| private static String DRIVER_CMD_FLAGS = "--job_endpoint=localhost:%s"; | ||
|
|
||
| private final String driverCmd; | ||
| private FlinkJobServerDriver driver = null; |
There was a problem hiding this comment.
| private FlinkJobServerDriver driver = null; | |
| private FlinkJobServerDriver driver; |
| // convey to the environment that the job was successfully constructed | ||
| throw new OptimizerPlanEnvironment.ProgramAbortException(); | ||
| } | ||
| throw new RuntimeException("Driver program failed."); |
There was a problem hiding this comment.
| throw new RuntimeException("Driver program failed."); | |
| throw new RuntimeException("Driver program failed with exit value " + driverProcess.exitValue()); |
Printing the error output could also be useful.
…FlinkPipelineRunner.java Co-Authored-By: Maximilian Michels <mxm@apache.org>
| * | ||
| * <p>Finally Flink launches the job. | ||
| */ | ||
| public class LyftFlinkPipelineRunner { |
There was a problem hiding this comment.
@mxm I would like to eventually add this class upstream. As for a suitable name, maybe FlinkPortableClientRunner, since it runs the SDK client to construct the pipeline?
|
Comments addressed. The execution handover is much simpler now, after few changes/cleanup in surrounding classes. |
|
Upstream ticket: https://issues.apache.org/jira/browse/BEAM-8471 |
|
upstream: apache#9872 |
709fb8a to
0b3082d
Compare
This is the more flexible approach of launching pipelines via the Flink REST API. Instead of running the driver program during build time and baking the pipeline into a Flink fat jar, the pipeline is constructed at job submission time through integration with the
OptimizerPlanEnvironment. More details in the javadoc.