Below is the file 'subprocess.ml' from this revision. You can also download the file.
open Viz_misc let init = Giochannel.init ; Gspawn.init let debug fmt = Printf.kprintf (if Viz_misc.debug "spawn" then (fun s -> Printf.eprintf "### spawn: %s\n%!" s) else ignore) fmt type encoding = [ `NONE | `LOCALE | `CHARSET of string ] let setup_channel ~nonblock encoding fd = let chan = Giochannel.unix_new (some fd) in if nonblock then Giochannel.set_flags chan [`NONBLOCK] ; begin match encoding with | `NONE -> Giochannel.set_encoding chan None ; Giochannel.set_buffered chan false | `LOCALE -> let (is_utf8, charset) = Glib.Convert.get_charset () in if not is_utf8 then Giochannel.set_encoding chan (Some charset) | `CHARSET charset -> Giochannel.set_encoding chan (Some charset) end ; chan let all_done_cb ~nb cb = let count = ref nb in fun () -> decr count ; if !count = 0 then cb () type watch = { name : string ; chan : Giochannel.t ; exn_cb : exn -> unit ; done_cb : unit -> unit ; } let stop_watch w = debug "%s cb: closing pipe" w.name ; try Giochannel.shutdown w.chan true with Giochannel.Error (_, msg) | Glib.Convert.Error (_, msg) -> debug "%s cb: error closing pipe %s" w.name msg let reset_watch w continue = if not continue then begin stop_watch w ; w.done_cb () end ; continue let in_channel_watch w input = let input_pos = ref 0 in let callback conditions = debug "stdin cb: %d left in buffer" (String.length input - !input_pos) ; let to_write = String.length input - !input_pos in let do_write = ref (to_write > 0 && List.mem `OUT conditions) in if !do_write then begin let bytes_written = ref 0 in try match Giochannel.write_chars w.chan ~bytes_written ~off:!input_pos input with | `NORMAL written -> debug "stdin cb: wrote %d" written ; input_pos := !input_pos + written | `AGAIN -> debug "stdin cb: EAGAIN ?" with | Giochannel.Error (_, msg) | Glib.Convert.Error (_, msg) as exn -> w.exn_cb exn ; debug "stdin cb: error %s, wrote %d" msg !bytes_written ; do_write := false end ; reset_watch w !do_write in Giochannel.add_watch w.chan [ `OUT ; `HUP ; `ERR ] callback let out_channel_watch w b = let sb = String.create 4096 in let callback conditions = let need_to_read = ref (List.mem `IN conditions) in if !need_to_read then begin try match Giochannel.read_chars w.chan sb with | `NORMAL read -> debug "%s cb: read %d" w.name read ; Buffer.add_substring b sb 0 read | `EOF -> debug "%s cb: eof" w.name ; need_to_read := false | `AGAIN -> debug "%s cb: AGAIN" w.name with | Giochannel.Error (_, msg) | Glib.Convert.Error (_, msg) as exn -> w.exn_cb exn ; debug "%s cb: error %s" w.name msg ; need_to_read := false end ; reset_watch w !need_to_read in Giochannel.add_watch w.chan [ `IN ; `HUP ; `ERR ] callback let pid_watch pid callback = let callback status = debug "child %d exiting, status %d" (Gspawn.int_of_pid pid) status ; callback status ; () in Gspawn.add_child_watch pid callback type t = { mutable watches : (watch * Giochannel.source_id) list ; mutable aborted : bool ; mutable status : int ; } let spawn ?working_directory encoding input_opt cmd reap_callback done_callback = let has_input = input_opt <> None in let spawn_flags = [ `PIPE_STDOUT ; `PIPE_STDERR ; `SEARCH_PATH ; `DO_NOT_REAP_CHILD ] in let child_info = Gspawn.async_with_pipes ?working_directory ~flags:(if has_input then `PIPE_STDIN :: spawn_flags else spawn_flags) cmd in let state = { watches = [] ; aborted = false ; status = -1 } in let out_buffer = Buffer.create 4096 in let err_buffer = Buffer.create 1024 in let exn_list = ref [] in let all_done = all_done_cb ~nb:(if has_input then 4 else 3) (fun () -> if not state.aborted then done_callback ~exceptions:!exn_list ~stdout:(Buffer.contents out_buffer) ~stderr:(Buffer.contents err_buffer) state.status) in let exn_cb exn = exn_list := exn :: !exn_list in let add_watch w id = state.watches <- (w, id) :: state.watches in if has_input then begin let ic = setup_channel ~nonblock:true encoding child_info.Gspawn.standard_input in let in_watch = { name = "stdin" ; chan = ic ; exn_cb = exn_cb ; done_cb = all_done } in let in_id = in_channel_watch in_watch (some input_opt) in add_watch in_watch in_id end ; begin let oc = setup_channel ~nonblock:false encoding child_info.Gspawn.standard_output in let out_watch = { name = "stdout" ; chan = oc ; exn_cb = exn_cb ; done_cb = all_done } in let out_id = out_channel_watch out_watch out_buffer in add_watch out_watch out_id end ; begin let ec = setup_channel ~nonblock:false encoding child_info.Gspawn.standard_error in let err_watch = { name = "stderr" ; chan = ec ; exn_cb = exn_cb ; done_cb = all_done } in let err_id = out_channel_watch err_watch err_buffer in add_watch err_watch err_id end ; let pid = some child_info.Gspawn.pid in let pid_id = pid_watch pid (fun s -> state.status <- s ; reap_callback () ; Gspawn.close_pid pid ; all_done ()) in state type callback = exceptions:exn list -> stdout:string -> stderr:string -> int -> unit (* spawn a process and grab its stdout and stderr *) let spawn_out ?working_directory ~encoding ~cmd ~reap_callback done_callback = spawn encoding None cmd reap_callback done_callback (* spawn a process, feed it a string and grab its stdout and stderr *) let spawn_inout ?working_directory ~encoding ~cmd ~input ~reap_callback done_callback = spawn encoding (Some input) cmd reap_callback done_callback let abort sub_data = if not sub_data.aborted then begin sub_data.aborted <- true ; List.iter (fun (w, id) -> Giochannel.remove_watch id ; stop_watch w) sub_data.watches end