Skip to content
This repository was archived by the owner on Apr 7, 2025. It is now read-only.

Entry point for Flink REST API using external SDK driver program.#27

Merged
tweise merged 10 commits intorelease-2.16.0-lyftfrom
flink_rest_pipeline_runner
Oct 28, 2019
Merged

Entry point for Flink REST API using external SDK driver program.#27
tweise merged 10 commits intorelease-2.16.0-lyftfrom
flink_rest_pipeline_runner

Conversation

@tweise
Copy link
Copy Markdown

@tweise tweise commented Oct 23, 2019

This is the more flexible approach of launching pipelines via the Flink REST API. Instead of running the driver program during build time and baking the pipeline into a Flink fat jar, the pipeline is constructed at job submission time through integration with the OptimizerPlanEnvironment. More details in the javadoc.

@tweise tweise requested a review from mxm October 23, 2019 04:58
return getExecutionEnvironment().execute(jobName);
try {
return getExecutionEnvironment().execute(jobName);
} catch (OptimizerPlanEnvironment.ProgramAbortException ex) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to move to FlinkPipelineRunner

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be the better place.

return getExecutionEnvironment().execute(jobName);
try {
return getExecutionEnvironment().execute(jobName);
} catch (OptimizerPlanEnvironment.ProgramAbortException ex) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be the better place.


// TODO: check for job service ready
Thread.sleep(5000);
success = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be addressed.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use something like the option retrieval to check for the lifeness of the JobService.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise I would remove these two lines here and just process the stdout below.

throw new IllegalStateException("Job service thread is not alive");
}
} finally {
System.setErr(oldErr);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thread needs to be cleaned up here in case of errors which could be caught done in the stack and would let the thread lingering.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also use the ProcessManager here?


@VisibleForTesting
Process getUnderlyingProcess() {
public Process getUnderlyingProcess() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks the ProcessManager. Could we instead add the required methods for this here?


private void runDriverProgram() throws Exception {
ProcessManager processManager = ProcessManager.create();
String executable = "bash";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why bash? Maybe sh?

ImmutableList.of("-c", String.format("exec %s " + DRIVER_CMD_FLAGS, driverCmd, jobPort));
String processId = "client1";

Duration timeout = Duration.ofSeconds(30);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make this configurable?


private final String driverCmd;
private FlinkJobServerDriver driver = null;
private Thread driverThread = null;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Thread driverThread = null;
private Thread driverThread;

private static String DRIVER_CMD_FLAGS = "--job_endpoint=localhost:%s";

private final String driverCmd;
private FlinkJobServerDriver driver = null;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private FlinkJobServerDriver driver = null;
private FlinkJobServerDriver driver;

// convey to the environment that the job was successfully constructed
throw new OptimizerPlanEnvironment.ProgramAbortException();
}
throw new RuntimeException("Driver program failed.");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new RuntimeException("Driver program failed.");
throw new RuntimeException("Driver program failed with exit value " + driverProcess.exitValue());

Printing the error output could also be useful.

*
* <p>Finally Flink launches the job.
*/
public class LyftFlinkPipelineRunner {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm I would like to eventually add this class upstream. As for a suitable name, maybe FlinkPortableClientRunner, since it runs the SDK client to construct the pipeline?

@tweise
Copy link
Copy Markdown
Author

tweise commented Oct 24, 2019

Comments addressed. The execution handover is much simpler now, after few changes/cleanup in surrounding classes.

@tweise
Copy link
Copy Markdown
Author

tweise commented Oct 24, 2019

Upstream ticket: https://issues.apache.org/jira/browse/BEAM-8471

@tweise
Copy link
Copy Markdown
Author

tweise commented Oct 24, 2019

upstream: apache#9872

@tweise tweise force-pushed the flink_rest_pipeline_runner branch from 709fb8a to 0b3082d Compare October 26, 2019 03:40
@tweise tweise merged commit 5ba66a6 into release-2.16.0-lyft Oct 28, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants