Twitter Heron: Stream Processing at Scale (2015)

Storm was Twitter's first stream processing system. Unfortunately, it wasn't good enough for a number of reasons. Heron is a Storm API compliant rewrite of Storm.

Motivation. In Storm, computation is expressed as a directed graph, or topology, where vertexes are computations and edges transport tuples. Storm topologies are run on a cluster of workers overseen by a central Nimbus. Each worker runs multiple worker processes which run a JVM which run multiple executors which run multiple tasks. The executors run multiple threads, and each worker also has a multi-threaded supervisor.

This worker architecture was far too complex. Multiple components were making scheduling decisions (e.g. OS schedules processes, JVM schedules executors; executors schedule tasks) which made it hard to predict when certain tasks would be run. Moreover, putting different types of tasks on the same executor complicated logs, exception handling, garbage collection, etc. The Storm scheduler was also not good at scheduling tasks with different resource requirements. The fact that workers were very multi-threaded meant that messages were traversing a lot of thread boundaries.

The Nimbus was a complicated piece of code that did too much stuff. It also often became a bottleneck and was a single point of failure. It's scheduling was so poor, that Twitter used to reserve nodes to exclusively run a single topology. The Nimbus also communicated with workers through ZooKeeper which became a bottleneck.

Storm also did not implement backpressure; when bolts became overloaded; packets were just dropped.

Design Alternatives. Twitter considered extending and modifying Storm to fix its problems, but its flaws were deeply entrenched in its design, so a rewrite would be difficult. They considered using other existing stream processing systems, but didn't want to break the Storm API and have to rewrite a bunch of applications. In the end, they felt like a rewrite was the best bet.

Data Model and API. Heron follows the exact same API as Storm. Computation is expressed as a directed graph where vertexes are spouts (sources of tuples) or bolts (tuple processors) and edges transfer tuples between vertexes. Users provide logical plans which are expanded to physical plans in order to exploit data parallelism. Heron provides at least once and at most once semantics.

Architecture Overview. Users submit Heron topologies to Aurora, though Heron is able to run on top of Mesos, YARN, ECS, etc. Each topology is run as a set of containers. One container runs the Topology Master (TM). The rest run a Stream Manager (SM), a Metrics Manager (MM), and multiple Heron Instances (HI). Topology state is kept in ZooKeeper, and the TM can have a standby. All communication is done via protobufs.

Topology Master. The TM is responsible for overseeing the execution of a topology and reporting its status. The TM holds an ephemeral node in ZooKeeper to ensure there is only ever one TM and so that other things can discover it.

Stream Manager. Stream Managers are responsible for routing tuples. There are k Stream Managers that form a clique. Though, O(k^2) connections is a lot, the number of Heron Instances can scale independently of k. Stream Managers communicate via TCP, short-circuiting if delivering within a container.

Heron, unlike Storm, implements backpressure. Here are three kinds of backpressure implementations:

Heron implements TCP and Spout Backpressure. Each socket is associated with a queue whose size has a high and low watermark. If the size exceeds the high watermark, backpressure is applied until it drops below the low watermark.

Heron Instances. Each Heron instance runs a single JVM which runs a single task which makes debugging significantly easier. Heron instances cannot be single-threaded because slow user code could prevent things like metrics from being reported in a timely manner. So, Heron implements Heron Instances with two threads: a Gateway Thread and a Task Execution Thread. The Gateway Thread communicates with the Task Execution Thread and also communicates with the SM and MM. The Task Execution Thread runs user code and gathers metrics. The Gateway Thread communicates with the Task Execution Thread using a set of one-directional queues. The sizes of these queues can be adjusted to avoid bad GC.

Metrics Manager. The metrics manager, well, manages metrics. It reports metrics to the TM and to a Twitter monitoring system.