Skip to content

Commit d553598

Browse files
committed
Support all sources and sinks
1 parent 12dcd75 commit d553598

5 files changed

Lines changed: 104 additions & 32 deletions

File tree

lib_eio/process.ml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,16 @@ let status proc = proc#status
1616
let signal proc = proc#signal
1717

1818
class virtual mgr = object
19-
method virtual spawn : sw:Switch.t -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
19+
method virtual spawn : 'a 'b 'c.
20+
sw:Switch.t ->
21+
?cwd:Fs.dir Path.t ->
22+
stdin:(#Flow.source as 'a) ->
23+
stdout:(#Flow.sink as 'b) ->
24+
stderr:(#Flow.sink as 'c) ->
25+
string ->
26+
string list ->
27+
t
2028
end
2129

22-
let spawn ~sw t ?cwd ~stdin ~stdout ~stderr cmd args = t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args
30+
let spawn ~sw (t:#mgr) ?cwd ~(stdin:#Flow.source) ~(stdout:#Flow.sink) ~(stderr:#Flow.sink) cmd args =
31+
t#spawn ~sw ?cwd ~stdin ~stdout ~stderr cmd args

lib_eio/process.mli

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,19 @@ val signal : #t -> int -> unit
1919
(** [signal t i] sends the signal [i] to process [t]. *)
2020

2121
class virtual mgr : object
22-
method virtual spawn :
23-
sw:Switch.t ->
22+
method virtual spawn : 'a 'b 'c.
23+
sw:Switch.t ->
2424
?cwd:Fs.dir Path.t ->
25-
stdin:Flow.source ->
26-
stdout:Flow.sink ->
27-
stderr:Flow.sink ->
25+
stdin:(#Flow.source as 'a) ->
26+
stdout:(#Flow.sink as 'b) ->
27+
stderr:(#Flow.sink as 'c) ->
2828
string ->
2929
string list ->
3030
t
3131
end
3232
(** A process manager capable of spawning new processes. *)
3333

34-
val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:Flow.source -> stdout:Flow.sink -> stderr:Flow.sink -> string -> string list -> t
34+
val spawn : sw:Switch.t -> #mgr -> ?cwd:Fs.dir Path.t -> stdin:#Flow.source -> stdout:#Flow.sink -> stderr:#Flow.sink -> string -> string list -> t
3535
(** [spawn ~sw mgr ?cwd ~stdin ~stdout ~stderr cmd args] creates a new subprocess that is connected to the
3636
switch [sw]. A process will be stopped when the switch is released.
3737

lib_eio_linux/eio_linux.ml

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -301,27 +301,50 @@ let process proc : Eio.Process.t = object
301301
method signal i = Process.signal proc i
302302
end
303303

304-
let pipe_or_fd flow =
305-
match Eio.Generic.probe flow FD with
306-
| None -> assert false
307-
| Some fd -> FD.to_rcfd fd
304+
let read_of_fd ~sw t =
305+
match get_fd_opt t with
306+
| None ->
307+
let r, w = Low_level.pipe ~sw in
308+
Some (flow w), r
309+
| Some fd -> None, fd
310+
311+
let write_of_fd ~sw t =
312+
match get_fd_opt t with
313+
| None ->
314+
let r, w = Low_level.pipe ~sw in
315+
Some (flow r), w
316+
| Some fd -> None, fd
308317

309318
let process_mgr = object
310-
method spawn ~sw ?cwd ~stdin ~stdout ~stderr prog args =
319+
inherit Eio.Process.mgr
320+
321+
method spawn ~sw ?cwd ~(stdin : #Eio.Flow.source) ~(stdout : #Eio.Flow.sink) ~(stderr : #Eio.Flow.sink) prog args =
311322
let chdir = Option.to_list cwd |> List.map (fun (_, s) -> Process.Fork_action.chdir s) in
312-
let stdin = pipe_or_fd stdin in
313-
let stdout = pipe_or_fd stdout in
314-
let stderr = pipe_or_fd stderr in
323+
let stdin_w, stdin_fd = read_of_fd ~sw stdin in
324+
let stdout_r, stdout_fd = write_of_fd ~sw stdout in
325+
let stderr_r, stderr_fd = write_of_fd ~sw stderr in
315326
let actions = Process.Fork_action.[
316327
Eio_unix.Private.Fork_action.inherit_fds [
317-
0, stdin, `Blocking;
318-
1, stdout, `Blocking;
319-
2, stderr, `Blocking;
328+
0, Fd.to_rcfd stdin_fd, `Blocking;
329+
1, Fd.to_rcfd stdout_fd, `Blocking;
330+
2, Fd.to_rcfd stderr_fd, `Blocking;
320331
];
321332
execve prog ~argv:(Array.of_list args) ~env:[||]
322333
] in
323334
let actions = chdir @ actions in
324-
process (Process.spawn ~sw actions)
335+
let proc = process (Process.spawn ~sw actions) in
336+
Option.iter (fun stdin_w ->
337+
Eio.Fiber.fork ~sw (fun () ->
338+
Eio.Flow.copy stdin stdin_w;
339+
Eio.Flow.close stdin_w
340+
)) stdin_w;
341+
Option.iter (fun stdout_r ->
342+
Fd.close stdout_fd;
343+
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stdout_r stdout)) stdout_r;
344+
Option.iter (fun stderr_r ->
345+
Fd.close stderr_fd;
346+
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stderr_r stdout)) stderr_r;
347+
proc
325348
end
326349

327350
type stdenv = <

lib_eio_posix/process.ml

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,25 +14,48 @@ let process proc : Eio.Process.t = object
1414
method signal i = Process.signal proc i
1515
end
1616

17-
let pipe_or_fd flow =
17+
let read_of_fd ~sw flow =
1818
match Fd.get_fd_opt flow with
19-
| None -> assert false
20-
| Some fd -> Fd.to_rcfd fd
19+
| None ->
20+
let r, w = pipe ~sw in
21+
Some (Flow.of_fd w), r
22+
| Some fd -> None, fd
23+
24+
let write_of_fd ~sw t =
25+
match Fd.get_fd_opt t with
26+
| None ->
27+
let r, w = pipe ~sw in
28+
Some (Flow.of_fd r), w
29+
| Some fd -> None, fd
2130

2231
let v = object
23-
method spawn ~sw ?cwd ~stdin ~stdout ~stderr prog args =
32+
inherit Eio.Process.mgr
33+
34+
method spawn ~sw ?cwd ~(stdin : #Eio.Flow.source) ~(stdout : #Eio.Flow.sink) ~(stderr : #Eio.Flow.sink) prog args =
2435
let chdir = Option.to_list cwd |> List.map (fun (_, s) -> Process.Fork_action.chdir s) in
25-
let stdin = pipe_or_fd stdin in
26-
let stdout = pipe_or_fd stdout in
27-
let stderr = pipe_or_fd stderr in
36+
let stdin_w, stdin_fd = read_of_fd ~sw stdin in
37+
let stdout_r, stdout_fd = write_of_fd ~sw stdout in
38+
let stderr_r, stderr_fd = write_of_fd ~sw stderr in
2839
let actions = Process.Fork_action.[
2940
Eio_unix.Private.Fork_action.inherit_fds [
30-
0, stdin, `Blocking;
31-
1, stdout, `Blocking;
32-
2, stderr, `Blocking;
41+
0, Fd.to_rcfd stdin_fd, `Blocking;
42+
1, Fd.to_rcfd stdout_fd, `Blocking;
43+
2, Fd.to_rcfd stderr_fd, `Blocking;
3344
];
3445
execve prog ~argv:(Array.of_list args) ~env:[||]
3546
] in
3647
let actions = chdir @ actions in
37-
process (Process.spawn ~sw actions)
48+
let proc = process (Process.spawn ~sw actions) in
49+
Option.iter (fun stdin_w ->
50+
Eio.Fiber.fork ~sw (fun () ->
51+
Eio.Flow.copy stdin stdin_w;
52+
Eio.Flow.close stdin_w
53+
)) stdin_w;
54+
Option.iter (fun stdout_r ->
55+
Fd.close stdout_fd;
56+
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stdout_r stdout)) stdout_r;
57+
Option.iter (fun stderr_r ->
58+
Fd.close stderr_fd;
59+
Eio.Fiber.fork ~sw (fun () -> Eio.Flow.copy stderr_r stdout)) stderr_r;
60+
proc
3861
end

tests/process.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ val with_pipe_from_child :
109109
Flow.copy r (Flow.buffer_sink buff);
110110
Buffer.contents buff;;
111111
val pread :
112-
< process_mgr : #Process.mgr; stderr : Flow.sink; stdin : Flow.source; .. > ->
112+
< process_mgr : #Process.mgr; stderr : #Flow.sink; stdin : #Flow.source;
113+
.. > ->
113114
string = <fun>
114115
# run @@ fun _spawn env ->
115116
pread env;;
@@ -141,3 +142,19 @@ hello world
141142
- : Process.status * Process.status * Process.status =
142143
(Eio.Process.Exited 0, Eio.Process.Exited 0, Eio.Process.Exited 0)
143144
```
145+
146+
Using sources and sinks that are not backed by file descriptors.
147+
148+
```ocaml
149+
# run @@ fun _spawn env ->
150+
let proc = env#process_mgr in
151+
let buf = Buffer.create 16 in
152+
let dst = Flow.buffer_sink buf in
153+
Eio.Switch.run @@ fun sw ->
154+
let p =
155+
Eio.Process.spawn proc ~sw ~stdin:env#stdin ~stdout:dst ~stderr:env#stderr "/usr/bin/echo" [ "echo"; "Hello, world" ]
156+
in
157+
let _ : Process.status = Process.status p in
158+
Buffer.contents buf
159+
- : string = "Hello, world\n"
160+
```

0 commit comments

Comments
 (0)