let broadcast (r: 'Pipe.Reader.t) (ws: 'Pipe.Writer.t list) : unit Deferred.t =
  Pipe.iter r ~f:(fun x -> Deferred.all_unit (List.map ws ~f:(fun w -> Pipe.write w x)))
  >>| fun () -> List.iter ws ~f:Pipe.close