stdOut = $tmpStdOut; $this->stdErr = $tmpStdErr; $this->process = new \React\ChildProcess\Process($this->command, null, null, [ 1 => $this->stdOut, 2 => $this->stdErr, ]); $this->process->start($this->loop); $this->onData = $onData; $this->onError = $onError; $this->process->on('exit', function ($exitCode) use ($onExit): void { $this->cancelTimer(); $output = ''; rewind($this->stdOut); $stdOut = stream_get_contents($this->stdOut); if (is_string($stdOut)) { $output .= $stdOut; } rewind($this->stdErr); $stdErr = stream_get_contents($this->stdErr); if (is_string($stdErr)) { $output .= $stdErr; } $onExit($exitCode, $output); fclose($this->stdOut); fclose($this->stdErr); }); } private function cancelTimer(): void { if ($this->timer === null) { return; } $this->loop->cancelTimer($this->timer); $this->timer = null; } /** * @param mixed[] $data */ public function request(array $data): void { $this->cancelTimer(); $this->in->write($data); $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void { $onError = $this->onError; $onError(new Exception(sprintf('Child process timed out after %.1f seconds. Try making it longer with parallel.processTimeout setting.', $this->timeoutSeconds))); }); } public function quit(): void { $this->cancelTimer(); if (!$this->process->isRunning()) { return; } foreach ($this->process->pipes as $pipe) { $pipe->close(); } $this->in->end(); } public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void { $out->on('data', function (array $json): void { $this->cancelTimer(); if ($json['action'] !== 'result') { return; } $onData = $this->onData; $onData($json['result']); }); $this->in = $in; $out->on('error', function (Throwable $error): void { $onError = $this->onError; $onError($error); }); $in->on('error', function (Throwable $error): void { $onError = $this->onError; $onError($error); }); } }