(* In this program, we'll use pipes in more advanced and more interesting ways.
 *)

open Core.Std
open Async.Std

(* Imagine we have two pipes: a and b. Someone is writing colors into pipe a
 * and fruits into pipe b. When we read from pipe a, we get the colors, and
 * when we read from pipe b, we get the fruits.
 *
 *   read a (* blue *)
 *   read a (* red *)
 *   read b (* bananas *)
 *   read b (* strawberries *)
 *
 * Now imagine we want to make a single pipe c. Whenever we read from c, we
 * want to get either a color from a or a fruit from b. That's exactly what the
 * interleave function does! It takes in a list of readers and returns a single
 * reader that yields values read from any of the input readers. So if we used
 * interleave to generate c, here's what the reads might look like.
 *
 *   let c = interleave [a; b]
 *   read c (* blue *)
 *   read c (* bananas *)
 *   read c (* strawberries *)
 *   read c (* red *)
 *
 * In terms of implementation, all we do is create a fresh pipe and then
 * transfer the input readers into it. We also have to make sure to close the
 * interleaved pipe when all the input pipes are closed.
 *)

let interleave (rs: 'Pipe.Reader.t list) : 'Pipe.Reader.t =
  let (r, w) = Pipe.create () in
  begin
    Deferred.List.iter ~how:`Parallel rs ~f:(fun r -> Pipe.transfer_id r w) >>> fun () ->
    Pipe.close w
  end;
  r

(* Here, we show off how to use interleave. We create 10 pipes and write into
 * each one. Then we interleave the pipes into a single reader and read all the
 * values from it. *)

let pipe_interleave () : unit Deferred.t =
  let (rs, ws) = List.(unzip (map (range 0 10) ~f:(fun _ -> Pipe.create ()))) in
  List.iteri ws ~f:(fun i w -> don't_wait_for (Pipe.write w i >>| fun () -> Pipe.close w));
  let r = interleave rs in
  Pipe.iter r ~f:(fun i -> return (printf "read %d\n" i))

(* With interleave, we took a set of pipes and interleaved them into a single
 * pipe. Here, we do the exact opposite. We take a single pipe and replicate
 * its output to a whole set of pipes. We simply iterate over the input pipe
 * and write the contents to every one of the output pipes. We also make sure
 * to close all the output pipes once the input pipe has closed. *)

let broadcast (r: 'Pipe.Reader.t) (ws: 'Pipe.Writer.t list) : unit Deferred.t =
  Pipe.iter r ~f:(fun x -> Deferred.all_unit (List.map ws ~f:(fun w -> Pipe.write w x)))
  >>| fun () -> List.iter ws ~f:Pipe.close

(* Here, we show off how to use broadcast. We create a pipe and write "fee",
 * "fi", "fo", and "fum" into it. Then, we broadcast it into 10 pipes. *)

let pipe_broadcast () : unit Deferred.t =
  let (r, w) = Pipe.create () in
  let (rs, ws) = List.(unzip (map (range 0 10) ~f:(fun _ -> Pipe.create ()))) in
  don't_wait_for (broadcast r ws);
  don't_wait_for (Pipe.write w "fee");
  don't_wait_for (Pipe.write w "fi");
  don't_wait_for (Pipe.write w "fo");
  don't_wait_for (Pipe.write w "fum");
  Pipe.close w;
  Deferred.List.iteri ~how:`Parallel rs ~f:(fun i r ->
    Pipe.iter r ~f:(fun s -> return (printf "%d: %s\n" i s)))

(* Interleave let a bunch of pipes converge into a single pipe. If we imagine
 * data flowing through pipes from left to right, interleave looked like this:
 *   __
 *     \
 *      \
 *   ---->----
 *      /
 *   __/
 *
 * And broadcast let a single pipe replicate its output to a whole set of
 * pipes. It looked like this:
 *          __
 *         /
 *        /
 *   ----<----
 *        \
 *         \__
 *
 * Now, we'll implement code that let's a whole set of producers (which we'll
 * call publishers) broadcast data to a whole set of consumers (which we'll
 * call subscribers). It looks like this:
 *
 *   publisher a  __          __ subscriber a
 *                  \        /
 *                   \      /
 *   publisher b  ---->----<---- subscriber b
 *                   /      \
 *                __/        \__ subscriber c
 *   publisher c
 *
 * Our publisher/subscriber service will be encapsulate by an instance of type
 * PubSub.t. Any publisher can publish a value by calling pub. For example, if
 * I wanted to publish the string "foo" to a pub/sub instance ps, I would call
 * `PubSub.pub sp "foo"`. Whenever a message is published, it is broadcasted to
 * all the subscribers. In order to become a subscriber, you call `PubSub.sub`.
 * This returns the reader end of a pipe from which you can read published
 * values. There's also a function close which closes all the subscribers'
 * pipes. *)

module PubSub = struct
  type 'a t = 'Pipe.Writer.t list ref

  let create () : 'a t =
    ref []

  let pub (subs: 'a t) (x: 'a) : unit Deferred.t =
    Deferred.List.iter ~how:`Parallel !subs ~f:(fun w -> Pipe.write w x)

  let sub (subs: 'a t) : 'Pipe.Reader.t =
    let (r, w) = Pipe.create () in
    subs := w::!subs;
    r

  let close (subs: 'a t) : unit =
    List.iter !subs ~f:(Pipe.close)
end

(* Here's a demo of our pub/sub service! *)
let pipe_pub_sub () : unit Deferred.t =
  let open PubSub in
  let ps = create () in
  don't_wait_for (pub ps "negative one");
  let r0 = sub ps in
  don't_wait_for (pub ps "zero");
  let r1 = sub ps in
  don't_wait_for (pub ps "one");
  let r2 = sub ps in
  don't_wait_for (pub ps "two");
  let r3 = sub ps in
  close ps;
  let subs = [r0; r1; r2; r3] in
  Deferred.List.iteri subs ~f:(fun i r -> Pipe.iter r ~f:(fun x -> return (printf "%d: %s\n" i x)))

let main () : unit Deferred.t =
  pipe_interleave () >>= fun () ->
  pipe_broadcast () >>= fun () ->
  pipe_pub_sub () >>= fun () ->
  return ()

let () =
  Command.(run (async ~summary:"" Spec.empty main))