essays and observations

An exercise in parallelizing map using fork

I read this thread and was surprised nobody offered any working code. It was interesting enough that I became obsessed with this problem and developed two working variations over a couple days.

The first one is simple:

fork.mli:
val map: ('a -> 'b) -> 'a Enum.t -> 'b list

fork.ml:
let pipe () =
        let i, o = Unix.pipe () in
        Unix.in_channel_of_descr i, Unix.out_channel_of_descr o

let map (f: ('a -> 'b)) (e: 'a Enum.t) =
        let result = ref [] in
        Enum.iter (fun x ->
                let i, o = pipe () in
                match Unix.fork () with
                        0 ->
                                (try
                                        Marshal.to_channel o (f x) [Marshal.Closures];
                                        exit 0
                                with _ -> exit 1)
                        | pid -> result := (pid, i) :: !result)
                e;
        List.rev_map (fun (pid, i) ->
                match Unix.waitpid [] pid with
                        _, Unix.WEXITED 0 -> (Marshal.from_channel i: 'b)
                        | _ -> failwith "Fork.map")
                !result

This forks a process for each item in the enumeration. The problem with this method is that the number of data items may be very large and you might fork too many processes. Forking a number substantially greater than the number of processors is not necessarily a good idea.

So I came up with this more complicated one:

fork.mli:
val map: int -> int -> ('a -> 'b) -> 'a array -> 'b array

fork.ml:
let bsearch cmp a =
        let rec sub kara made =
                let i = ((made - kara) lsr 1) + kara in
                let x = a.(i) in
                match cmp x with
                        0 -> x
                        | n when n < 0 && kara <> i ->
                                sub kara i
                        | n when n > 0 && (i + 1) <> i->
                                sub (i + 1) made
                        | _ -> raise Not_found in
        sub 0 (Array.length a)

let read_index ch =
        let buf = String.create 3 in
        really_input ch buf 0 3;
        (int_of_char buf.[0])
                lor ((int_of_char buf.[1]) lsl 8)
                lor ((int_of_char buf.[2]) lsl 16)

(* 64 bit
let read_index ch =
        let buf = String.create 7 in
        really_input ch buf 0 7;
        (int_of_char buf.[0])
                lor ((int_of_char buf.[1]) lsl 8)
                lor ((int_of_char buf.[2]) lsl 16)
                lor ((int_of_char buf.[3]) lsl 24)
                lor ((int_of_char buf.[4]) lsl 32)
                lor ((int_of_char buf.[5]) lsl 40)
                lor ((int_of_char buf.[6]) lsl 48)
*)

let write_index fd i =
        let buf = String.create 3 in
        buf.[0] <- char_of_int (i land 0xff);
        buf.[1] <- char_of_int ((i lsr 8) land 0xff);
        buf.[2] <- char_of_int ((i lsr 16) land 0xff);
        if (Unix.single_write fd buf 0 3) != 3 then failwith "Fork.map"

(* 64 bit
let write_index fd i =
        let buf = String.create 7 in
        buf.[0] <- char_of_int (i land 0xff);
        buf.[1] <- char_of_int ((i lsr 8) land 0xff);
        buf.[2] <- char_of_int ((i lsr 16) land 0xff);
        buf.[3] <- char_of_int ((i lsr 24) land 0xff);
        buf.[4] <- char_of_int ((i lsr 32) land 0xff);
        buf.[5] <- char_of_int ((i lsr 40) land 0xff);
        buf.[6] <- char_of_int ((i lsr 48) land 0xff);
        if (Unix.single_write fd buf 0 7) != 7 then failwith "Fork.map"
*)

let child input output f a =
        let output = (Unix.out_channel_of_descr output) in
        let input = (Unix.in_channel_of_descr input) in
        try while (true) do let i = read_index input in
                Marshal.to_channel output (f a.(i)) [Marshal.Closures];
                flush output done
        with End_of_file -> exit 0

type child = {
        infd: Unix.file_descr;
        outfd: Unix.file_descr;
        pid: int;
        mutable i: int;
        mutable buffer: string;
        mutable have: int;
        mutable want: int }

let map nprocs size (f: ('a -> 'b)) (a: 'a array) =
        let nprocs = min nprocs (Array.length a) in
        let size = max size (Marshal.header_size + 1) in
        let fds = ref [] in
        let children = Array.make nprocs (Obj.magic ()) in
        for i = 0 to (nprocs - 1) do
                let from_child, to_parent = Unix.pipe () in
                let from_parent, to_child = Unix.pipe () in
                match Unix.fork () with
                        0 -> child from_parent to_parent f a
                        | pid ->
                                write_index to_child i;
                                fds := from_child :: !fds;
                                children.(i) <- {
                                        infd = from_child;
                                        outfd = to_child;
                                        i = i;
                                        buffer = String.create size;
                                        have = 0;
                                        want = 0;
                                        pid = pid } done;
        Array.fast_sort (fun a b -> compare a.infd b.infd) children;
        let child_of_infd fd =
                bsearch (fun x -> compare fd x.infd) children in
        let results = Array.make (Array.length a) (Obj.magic ()) in
        let i = ref nprocs in
        let result_process child =
                results.(child.i) <- (Marshal.from_string child.buffer 0: 'b);
                if !i < Array.length a then (
                        write_index child.outfd !i;
                        child.i <- !i;
                        i := !i + 1;
                        child.have <- 0)
                else (
                        Unix.close child.outfd;
                        Unix.close child.infd;
                        fds := List.filter ((<>) child.infd) !fds;
                        Unix.kill child.pid Sys.sigkill) in
        let header_process child =
                let n = Unix.read child.infd child.buffer child.have
                        ((String.length child.buffer) - child.have) in
                child.have <- n + child.have;
                if child.have >= Marshal.header_size then (
                        child.want <- Marshal.total_size child.buffer 0;
                        if child.want = child.have then result_process child
                        else if (String.length child.buffer) < child.want
                                        then (
                                let tmp = String.create child.want in
                                String.blit child.buffer 0 tmp 0 child.have;
                                child.buffer <- tmp)) in
        let data_process child =
                let n = Unix.read child.infd child.buffer child.have
                        (child.want - child.have) in
                child.have <- n + child.have;
                if child.have = child.want then result_process child in
        while (!fds != []) do
                let ready, _, _ = Unix.select !fds [] [] 0.0 in
                List.iter
                        (fun fd ->
                                let child = child_of_infd fd in
                                if child.have < Marshal.header_size then
                                        header_process child
                                else data_process child)
                        ready done;
        for i = 0 to (nprocs - 1) do ignore (Unix.waitpid [] children.(i).pid) done;
        results

This one creates a given number of processes and then assigns the items to the processes repeatedly until completion. The return process necessitates the use of select. I also used arrays for speed, and I discovered that it's a little difficult to create an array and then fill it in later. You can use option types, but that adds another level of boxing. So I took the evil way out and used Obj.magic.

These both work, but the second one occasionally stalls for 0.3 seconds (no matter what the processor) before returning in my tests. It seems totally random. It could be the garbage collector, but I'm not sure that there's much garbage being generated.

As an aside, why the hell isn't there a binary search function in the standard library? Even C has it.

2007/06/03 23:39