Skip to content

Commit f19c384

Browse files
authored
Merge pull request #27365: Automatically use Docker Compose based transform service for PythonExternalTransform when needed.
2 parents 7eaef18 + ccf6544 commit f19c384

2 files changed

Lines changed: 58 additions & 8 deletions

File tree

sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.File;
2121
import java.io.FileOutputStream;
22+
import java.io.IOException;
2223
import java.io.OutputStreamWriter;
2324
import java.io.Writer;
2425
import java.util.ArrayList;
@@ -67,6 +68,8 @@
6768
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
6869
import org.checkerframework.checker.nullness.qual.NonNull;
6970
import org.checkerframework.checker.nullness.qual.Nullable;
71+
import org.slf4j.Logger;
72+
import org.slf4j.LoggerFactory;
7073

7174
/** Wrapper for invoking external Python transforms. */
7275
public class PythonExternalTransform<InputT extends PInput, OutputT extends POutput>
@@ -88,6 +91,8 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
8891

8992
Map<String, Coder<?>> outputCoders;
9093

94+
private static final Logger LOG = LoggerFactory.getLogger(PythonExternalTransform.class);
95+
9196
private PythonExternalTransform(String fullyQualifiedName, String expansionService) {
9297
this.fullyQualifiedName = fullyQualifiedName;
9398
this.expansionService = expansionService;
@@ -438,6 +443,29 @@ ExternalTransforms.ExternalConfigurationPayload generatePayload() {
438443
}
439444
}
440445

446+
private boolean isPythonAvailable() {
447+
for (String executable : ImmutableList.of("python3", "python")) {
448+
try {
449+
new ProcessBuilder(executable, "--version").start().waitFor();
450+
return true;
451+
} catch (IOException | InterruptedException exn) {
452+
// Ignore.
453+
}
454+
}
455+
return false;
456+
}
457+
458+
private boolean isDockerAvailable() {
459+
String executable = "docker";
460+
try {
461+
new ProcessBuilder(executable, "--version").start().waitFor();
462+
return true;
463+
} catch (IOException | InterruptedException exn) {
464+
// Ignore.
465+
}
466+
return false;
467+
}
468+
441469
@Override
442470
public OutputT expand(InputT input) {
443471
try {
@@ -458,14 +486,34 @@ public OutputT expand(InputT input) {
458486
OutputT output = null;
459487
int port = PythonService.findAvailablePort();
460488
PipelineOptionsFactory.register(PythonExternalTransformOptions.class);
461-
if (input
462-
.getPipeline()
463-
.getOptions()
464-
.as(PythonExternalTransformOptions.class)
465-
.getUseTransformService()) {
489+
boolean useTransformService =
490+
input
491+
.getPipeline()
492+
.getOptions()
493+
.as(PythonExternalTransformOptions.class)
494+
.getUseTransformService();
495+
boolean pythonAvailable = isPythonAvailable();
496+
boolean dockerAvailable = isDockerAvailable();
497+
498+
// We use the transform service if either of the following is true.
499+
// * It was explicitly requested.
500+
// * Python executable is not available in the system but Docker is available.
501+
if (useTransformService || (!pythonAvailable && dockerAvailable)) {
466502
// A unique project name ensures that this expansion gets a dedicated instance of the
467503
// transform service.
468504
String projectName = UUID.randomUUID().toString();
505+
506+
String messageAppend =
507+
useTransformService
508+
? "it was explicitly requested"
509+
: "a Python executable is not available in the system";
510+
LOG.info(
511+
"Using the Docker Compose based transform service since {}. Service will have the "
512+
+ "project name {} and will be made available at the port {}",
513+
messageAppend,
514+
projectName,
515+
port);
516+
469517
TransformServiceLauncher service = TransformServiceLauncher.forProject(projectName, port);
470518
service.setBeamVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
471519
// TODO(https://github.com/apache/beam/issues/26833): add support for installing extra

sdks/java/transform-service/launcher/src/main/java/org/apache/beam/sdk/transformservice/launcher/TransformServiceLauncher.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,11 @@ private TransformServiceLauncher(@Nullable String projectName, int port) throws
115115
if (applicationDefaultCredentialsFile.exists()) {
116116
Files.copy(applicationDefaultCredentialsFile, applicationDefaultCredentialsFileCopied);
117117
} else {
118-
throw new RuntimeException(
119-
"Could not find the application default file: "
120-
+ applicationDefaultCredentialsFile.getAbsolutePath());
118+
LOG.error(
119+
"GCP credentials will not be available for the transform service since the Google "
120+
+ "Cloud application default credentials file could not be found at the expected "
121+
+ "location {}.",
122+
applicationDefaultFilePath);
121123
}
122124
}
123125

0 commit comments

Comments
 (0)