struct
  type 'a t = 'Pipe.Writer.t list ref

  let create () : 'a t =
    ref []

  let pub (subs: 'a t) (x: 'a) : unit Deferred.t =
    Deferred.List.iter ~how:`Parallel !subs ~f:(fun w -> Pipe.write w x)

  let sub (subs: 'a t) : 'Pipe.Reader.t =
    let (r, w) = Pipe.create () in
    subs := w::!subs;
    r

  let close (subs: 'a t) : unit =
    List.iter !subs ~f:(Pipe.close)
end