1818import com .google .devtools .build .lib .worker .WorkerProtocol .WorkResponse ;
1919import com .google .errorprone .annotations .CanIgnoreReturnValue ;
2020import com .sun .management .OperatingSystemMXBean ;
21+ import java .io .ByteArrayInputStream ;
22+ import java .io .ByteArrayOutputStream ;
2123import java .io .IOException ;
24+ import java .io .InputStream ;
2225import java .io .PrintStream ;
2326import java .io .PrintWriter ;
2427import java .io .StringWriter ;
2528import java .lang .management .ManagementFactory ;
29+ import java .nio .charset .StandardCharsets ;
2630import java .time .Duration ;
2731import java .util .List ;
2832import java .util .Optional ;
@@ -317,8 +321,17 @@ public WorkRequestHandler build() {
317321 * then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
318322 * reading or writing the requests or responses, it writes an error message on {@code err} and
319323 * returns. If {@code in} reaches EOF, it also returns.
324+ *
325+ * <p>This function also wraps the system streams in a {@link WorkerIO} instance that prevents the
326+ * underlying tool from writing to {@link System#out} or reading from {@link System#in}, which
327+ * would corrupt the worker worker protocol. When the while loop exits, the original system
328+ * streams will be swapped back into {@link System}.
320329 */
321330 public void processRequests () throws IOException {
331+ // Wrap the system streams into a WorkerIO instance to prevent unexpected reads and writes on
332+ // stdin/stdout.
333+ WorkerIO workerIO = WorkerIO .capture ();
334+
322335 try {
323336 while (!shutdownWorker .get ()) {
324337 WorkRequest request = messageProcessor .readWorkRequest ();
@@ -328,31 +341,39 @@ public void processRequests() throws IOException {
328341 if (request .getCancel ()) {
329342 respondToCancelRequest (request );
330343 } else {
331- startResponseThread (request );
344+ startResponseThread (workerIO , request );
332345 }
333346 }
334347 } catch (IOException e ) {
335348 stderr .println ("Error reading next WorkRequest: " + e );
336349 e .printStackTrace (stderr );
337- }
338- // TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
339- // but also try to kill stuck threads. For now, we just interrupt the remaining threads.
340- // We considered doing System.exit here, but that is hard to test and would deny the callers
341- // of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
342- // and the caller can decide whether to wait for an orderly shutdown or now.
343- for (RequestInfo ri : activeRequests .values ()) {
344- if (ri .thread .isAlive ()) {
345- try {
346- ri .thread .interrupt ();
347- } catch (RuntimeException e ) {
348- // If we can't interrupt, we can't do much else.
350+ } finally {
351+ // TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
352+ // but also try to kill stuck threads. For now, we just interrupt the remaining threads.
353+ // We considered doing System.exit here, but that is hard to test and would deny the callers
354+ // of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
355+ // and the caller can decide whether to wait for an orderly shutdown or now.
356+ for (RequestInfo ri : activeRequests .values ()) {
357+ if (ri .thread .isAlive ()) {
358+ try {
359+ ri .thread .interrupt ();
360+ } catch (RuntimeException e ) {
361+ // If we can't interrupt, we can't do much else.
362+ }
349363 }
350364 }
365+
366+ try {
367+ // Unwrap the system streams placing the original streams back
368+ workerIO .close ();
369+ } catch (Exception e ) {
370+ stderr .println (e .getMessage ());
371+ }
351372 }
352373 }
353374
354375 /** Starts a thread for the given request. */
355- void startResponseThread (WorkRequest request ) {
376+ void startResponseThread (WorkerIO workerIO , WorkRequest request ) {
356377 Thread currentThread = Thread .currentThread ();
357378 String threadName =
358379 request .getRequestId () > 0
@@ -381,7 +402,7 @@ void startResponseThread(WorkRequest request) {
381402 return ;
382403 }
383404 try {
384- respondToRequest (request , requestInfo );
405+ respondToRequest (workerIO , request , requestInfo );
385406 } catch (IOException e ) {
386407 // IOExceptions here means a problem talking to the server, so we must shut down.
387408 if (!shutdownWorker .compareAndSet (false , true )) {
@@ -419,7 +440,8 @@ void startResponseThread(WorkRequest request) {
419440 * #callback} are reported with exit code 1.
420441 */
421442 @ VisibleForTesting
422- void respondToRequest (WorkRequest request , RequestInfo requestInfo ) throws IOException {
443+ void respondToRequest (WorkerIO workerIO , WorkRequest request , RequestInfo requestInfo )
444+ throws IOException {
423445 int exitCode ;
424446 StringWriter sw = new StringWriter ();
425447 try (PrintWriter pw = new PrintWriter (sw )) {
@@ -431,6 +453,16 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
431453 e .printStackTrace (pw );
432454 exitCode = 1 ;
433455 }
456+
457+ try {
458+ // Read out the captured string for the final WorkResponse output
459+ String captured = workerIO .readCapturedAsUtf8String ().trim ();
460+ if (!captured .isEmpty ()) {
461+ pw .write (captured );
462+ }
463+ } catch (IOException e ) {
464+ stderr .println (e .getMessage ());
465+ }
434466 }
435467 Optional <WorkResponse .Builder > optBuilder = requestInfo .takeBuilder ();
436468 if (optBuilder .isPresent ()) {
@@ -541,4 +573,104 @@ private void maybePerformGc() {
541573 }
542574 }
543575 }
576+
577+ /**
578+ * Class that wraps the standard {@link System#in}, {@link System#out}, and {@link System#err}
579+ * with our own ByteArrayOutputStream that allows {@link WorkRequestHandler} to safely capture
580+ * outputs that can't be directly captured by the PrintStream associated with the work request.
581+ *
582+ * <p>This is most useful when integrating JVM tools that write exceptions and logs directly to
583+ * {@link System#out} and {@link System#err}, which would corrupt the persistent worker protocol.
584+ * We also redirect {@link System#in}, just in case a tool should attempt to read it.
585+ *
586+ * <p>WorkerIO implements {@link AutoCloseable} and will swap the original streams back into
587+ * {@link System} once close has been called.
588+ */
589+ public static class WorkerIO implements AutoCloseable {
590+ private final InputStream originalInputStream ;
591+ private final PrintStream originalOutputStream ;
592+ private final PrintStream originalErrorStream ;
593+ private final ByteArrayOutputStream capturedStream ;
594+ private final AutoCloseable restore ;
595+
596+ /**
597+ * Creates a new {@link WorkerIO} that allows {@link WorkRequestHandler} to capture standard
598+ * output and error streams that can't be directly captured by the PrintStream associated with
599+ * the work request.
600+ */
601+ @ VisibleForTesting
602+ WorkerIO (
603+ InputStream originalInputStream ,
604+ PrintStream originalOutputStream ,
605+ PrintStream originalErrorStream ,
606+ ByteArrayOutputStream capturedStream ,
607+ AutoCloseable restore ) {
608+ this .originalInputStream = originalInputStream ;
609+ this .originalOutputStream = originalOutputStream ;
610+ this .originalErrorStream = originalErrorStream ;
611+ this .capturedStream = capturedStream ;
612+ this .restore = restore ;
613+ }
614+
615+ /** Wraps the standard System streams and WorkerIO instance */
616+ public static WorkerIO capture () {
617+ // Save the original streams
618+ InputStream originalInputStream = System .in ;
619+ PrintStream originalOutputStream = System .out ;
620+ PrintStream originalErrorStream = System .err ;
621+
622+ // Replace the original streams with our own instances
623+ ByteArrayOutputStream capturedStream = new ByteArrayOutputStream ();
624+ PrintStream outputBuffer = new PrintStream (capturedStream , true );
625+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream (new byte [0 ]);
626+ System .setIn (byteArrayInputStream );
627+ System .setOut (outputBuffer );
628+ System .setErr (outputBuffer );
629+
630+ return new WorkerIO (
631+ originalInputStream ,
632+ originalOutputStream ,
633+ originalErrorStream ,
634+ capturedStream ,
635+ () -> {
636+ System .setIn (originalInputStream );
637+ System .setOut (originalOutputStream );
638+ System .setErr (originalErrorStream );
639+ outputBuffer .close ();
640+ byteArrayInputStream .close ();
641+ });
642+ }
643+
644+ /** Returns the original input stream most commonly provided by {@link System#in} */
645+ @ VisibleForTesting
646+ InputStream getOriginalInputStream () {
647+ return originalInputStream ;
648+ }
649+
650+ /** Returns the original output stream most commonly provided by {@link System#out} */
651+ @ VisibleForTesting
652+ PrintStream getOriginalOutputStream () {
653+ return originalOutputStream ;
654+ }
655+
656+ /** Returns the original error stream most commonly provided by {@link System#err} */
657+ @ VisibleForTesting
658+ PrintStream getOriginalErrorStream () {
659+ return originalErrorStream ;
660+ }
661+
662+ /** Returns the captured outputs as a UTF-8 string */
663+ @ VisibleForTesting
664+ String readCapturedAsUtf8String () throws IOException {
665+ capturedStream .flush ();
666+ String captureOutput = capturedStream .toString (StandardCharsets .UTF_8 );
667+ capturedStream .reset ();
668+ return captureOutput ;
669+ }
670+
671+ @ Override
672+ public void close () throws Exception {
673+ restore .close ();
674+ }
675+ }
544676}
0 commit comments