1919
2020import java .io .File ;
2121import java .io .FileOutputStream ;
22+ import java .io .IOException ;
2223import java .io .OutputStreamWriter ;
2324import java .io .Writer ;
2425import java .util .ArrayList ;
6768import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .collect .Iterables ;
6869import org .checkerframework .checker .nullness .qual .NonNull ;
6970import org .checkerframework .checker .nullness .qual .Nullable ;
71+ import org .slf4j .Logger ;
72+ import org .slf4j .LoggerFactory ;
7073
7174/** Wrapper for invoking external Python transforms. */
7275public class PythonExternalTransform <InputT extends PInput , OutputT extends POutput >
@@ -88,6 +91,8 @@ public class PythonExternalTransform<InputT extends PInput, OutputT extends POut
8891
8992 Map <String , Coder <?>> outputCoders ;
9093
94+ private static final Logger LOG = LoggerFactory .getLogger (PythonExternalTransform .class );
95+
9196 private PythonExternalTransform (String fullyQualifiedName , String expansionService ) {
9297 this .fullyQualifiedName = fullyQualifiedName ;
9398 this .expansionService = expansionService ;
@@ -438,6 +443,29 @@ ExternalTransforms.ExternalConfigurationPayload generatePayload() {
438443 }
439444 }
440445
446+ private boolean isPythonAvailable () {
447+ for (String executable : ImmutableList .of ("python3" , "python" )) {
448+ try {
449+ new ProcessBuilder (executable , "--version" ).start ().waitFor ();
450+ return true ;
451+ } catch (IOException | InterruptedException exn ) {
452+ // Ignore.
453+ }
454+ }
455+ return false ;
456+ }
457+
458+ private boolean isDockerAvailable () {
459+ String executable = "docker" ;
460+ try {
461+ new ProcessBuilder (executable , "--version" ).start ().waitFor ();
462+ return true ;
463+ } catch (IOException | InterruptedException exn ) {
464+ // Ignore.
465+ }
466+ return false ;
467+ }
468+
441469 @ Override
442470 public OutputT expand (InputT input ) {
443471 try {
@@ -458,14 +486,34 @@ public OutputT expand(InputT input) {
458486 OutputT output = null ;
459487 int port = PythonService .findAvailablePort ();
460488 PipelineOptionsFactory .register (PythonExternalTransformOptions .class );
461- if (input
462- .getPipeline ()
463- .getOptions ()
464- .as (PythonExternalTransformOptions .class )
465- .getUseTransformService ()) {
489+ boolean useTransformService =
490+ input
491+ .getPipeline ()
492+ .getOptions ()
493+ .as (PythonExternalTransformOptions .class )
494+ .getUseTransformService ();
495+ boolean pythonAvailable = isPythonAvailable ();
496+ boolean dockerAvailable = isDockerAvailable ();
497+
498+ // We use the transform service if either of the following is true.
499+ // * It was explicitly requested.
500+ // * Python executable is not available in the system but Docker is available.
501+ if (useTransformService || (!pythonAvailable && dockerAvailable )) {
466502 // A unique project name ensures that this expansion gets a dedicated instance of the
467503 // transform service.
468504 String projectName = UUID .randomUUID ().toString ();
505+
506+ String messageAppend =
507+ useTransformService
508+ ? "it was explicitly requested"
509+ : "a Python executable is not available in the system" ;
510+ LOG .info (
511+ "Using the Docker Compose based transform service since {}. Service will have the "
512+ + "project name {} and will be made available at the port {}" ,
513+ messageAppend ,
514+ projectName ,
515+ port );
516+
469517 TransformServiceLauncher service = TransformServiceLauncher .forProject (projectName , port );
470518 service .setBeamVersion (ReleaseInfo .getReleaseInfo ().getSdkVersion ());
471519 // TODO(https://github.com/apache/beam/issues/26833): add support for installing extra
0 commit comments