struct
type 'a t = 'a Pipe.Writer.t list ref
let create () : 'a t =
ref []
let pub (subs: 'a t) (x: 'a) : unit Deferred.t =
Deferred.List.iter ~how:`Parallel !subs ~f:(fun w -> Pipe.write w x)
let sub (subs: 'a t) : 'a Pipe.Reader.t =
let (r, w) = Pipe.create () in
subs := w::!subs;
r
let close (subs: 'a t) : unit =
List.iter !subs ~f:(Pipe.close)
end