aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFangfan Li <fangfanli28@gmail.com>2016-12-15 21:46:02 -0500
committerFangfan Li <fangfanli28@gmail.com>2016-12-15 21:46:02 -0500
commitfae7befbabd6eaf958ab605e20f18ddf696aa0bc (patch)
tree3e599809e29d2b53be9263333afa33ec8e5d2034
parentf06d8eb1a4c6f8ea9617247f1076fa56d6dba12b (diff)
More revisions
-rw-r--r--chapter/9/streaming.md120
1 files changed, 99 insertions, 21 deletions
diff --git a/chapter/9/streaming.md b/chapter/9/streaming.md
index f1268b1..b7bc4f3 100644
--- a/chapter/9/streaming.md
+++ b/chapter/9/streaming.md
@@ -8,30 +8,46 @@ The previous chapter discusses the large scale batch processing system, where th
There are many challenges for implementing large scale streaming processing system. Similar to large scale batch processing systems, large scale streaming systems also have to deal with consistency and fault-tolerance due to the distributed nature of those systems. Moreover, latency at the scale of several minutes is at most a nuisance in batch processing while latency is not as tolerable in large streaming processing.
-In the rest of this chapter, we would introduce the 1) History of streaming processing 2) How to represent the input data stream 3) What are the practices to process data stream 4) The state-of-the-art systems used by applications.
+Despite those challenges, there are many active research and production in the stream processing area, and we want to answer the following questions in this article: 1) what are the earliest ideas of stream processing, why would we want to analyze a stream of data 2) what exactly is a stream, how is it implemented in real system 3) what are the systems that are built for large scale stream processing, and what are the differences between them 4) what are the systems that are being used by companies for their applications, do they build their own system or they would use the existing systems.
+
## Data in constant motion
-This concept of streaming data can trace back to TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %}
-, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data streams. In contrast to traditional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the query processing systems that deals with data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*.
+Computing data stream has long been studied in the area of Theory of Computing. Assume we have a sequence of elements, and we want to compute the frequency moments of the data (i.e., count how many of each of the distinct data appear in the sequence). To do that, we could maintain a full histogram on the data, a counter for each data value. However, the memory that we have is not unlimited, thus we can not gather every data, we can then use randomized algorithms for approximating the frequency moments with limited resource{% cite alon1996space --file streaming %}. So analyzing the stream using random algorithm was because the lack of storage.
+
+Besides randomized processing on the data sequence, systems were also being developed to deal with the input data that is not static and predicatable. Instead of motivating by the lack of resources, those projects were mostly motivated by the fact that in emerging networked environments, data is the commodity of interest, and its value is realized only within the time that it is needed. TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %} is one example among those earliest such systems, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data. In contrast to traditional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing where the system would react to a some special data received, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the systems that can query processing over data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*.
-The important concepts of TelegraphCQ include *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, the queries with some commonality can be combined together to improve the performance.
+The important concepts of TelegraphCQ include *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, in order to avoid blocking and having to interrupt the dataflow, data should be processed simultaneously by all the queries that require the dataflow. In TelegraphCQ, those queries with such commonality can be combined together to improve the performance.
-TelegraphCQ shows the importance of modeling data as stream and how can we process such data stream. But TelegraphCQ was only implemented in a non-distributed prototype, we would then discuss how data stream is processed in a large scale.
+TelegraphCQ shows the importance of modeling data as stream and how can we process such data stream however it was only implemented in a non-distributed prototype.
+
+TODO: Other systems
## How to represent data stream
-Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a job in *Spark* {% cite zaharia2012resilient --file streaming %} to analyze them. The *Spark* job itself may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless).
+Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a batch *Spark* {% cite zaharia2012resilient --file streaming %} job to analyze them. The *Spark* job itself may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless).
+
+Before dive into the details of the large scale processing, we would first introduce a few concepts: producer, processor and consumer based on the example.
-Before dive into the details of the large scale processing, we would first introduce a few concepts: producer, processor and consumer. The producer is where the data stream comes from, it can be a user who are tweeting in the previous example, the consumer would be the advertisement team then, and the processor is then the *magical* component that we need to produce the results. The producers and consumers are fairly straight forward, it is the processor that are being discussed in this chapter.
+- The producer is where the data stream comes from, it would be a user who are tweeting.
+- The consumer is where the results are needed, the advertisement team would be the consumer.
+- The processor is then the *magical* component that takes the stream and produce the results.
-In this section, we would first illustrate what is the *stream* that the producers are giving to the processor, which is the component between producers and processors-the data stream.
+<figure class="fullwidth">
+ <img src="{{ site.baseurl }}/chapter/9/DiagraphStream" alt="An example of a stream processing system" />
+</figure>
+
+The producers and consumers are fairly straight forward, it is the processor that are being discussed in this chapter.
+
+In this section, we would first illustrate what is the *stream* (i.e., the tuples between components) that the producers are giving to the processor, which is the component between producers and processors-the data stream.
We have been talking about the stream of data, but this is a bit under-specified, since the data can be collected from many producers (i.e. different users), how do we combine those data into actual streams and send the them to the processors? What does a data stream really look like?
-A natural view of a data stream can be an infinite sequence of tuples reading from a queue. However, a traditional queue would not be sufficient in large scale system since the consumed tuple might got lost or the consumer might fail thus it might request the previous tuple after a restart. The alternative queue design is a multi-consumer queue, where a pool of readers may read from a single queue and each record goes to one of them, which is more suitable for a distributed system. In a traditional multi-consumer queue, once a consumer reads the data out, it is gone. This would be problematic in a large stream processing system, since the messages are more likely to be lost during transmission, and we want to keep track of what are the data that are successfully being consumed and what are the data that might be lost on their way towards the consumer. Thus we need a little fancier queue to keep track of *what* has been consumed, in order to suit the distributed environment of large stream processing system.
+A natural view of a data stream can be an infinite sequence of tuples reading from a queue. However, a traditional queue would not be sufficient in large scale system since the consumed tuple might got lost or the consumer might fail thus it might request the previous tuple after a restart. Furthermore, since the processing power for a single machine is limited, we want several machines to be able to read from the same queue, thus they can work on the stream in parallel. The alternative queue design is then a multi-consumer queue, where a pool of readers may read from a single queue and each record goes to one of them. In a traditional multi-consumer queue, once a consumer reads the data out, it is gone. This would be problematic in a large stream processing system, since the messages are more likely to be lost during transmission, and we want to keep track of what are the data that are successfully being consumed and what are the data that might be lost on their way towards the consumer. Thus we need a little fancier queue to keep track of *what* has been consumed, in order to be resilient in the face of packet loss or network failure.
+
+A naïve approach to attempting to handle lost messages or failures could be to record the message upon sending it, and to wait for the acknowledgement from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of losing messages, because if a message is lost, it would not be acknowledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is memory consumption, since the sender has now to keep track of every single messages being sent out with multiple stages, and only free them when acknowledged.
-An intuitive choice would be recording the message when sending it out and wait for the acknowledgment from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of losing messages, because if a message is lost, it would not be acknowledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is performance, the sender has now to keep track of every single messages being sent out with multiple stages. Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination.
+Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination.
## How to process data stream
@@ -41,7 +57,7 @@ Now we know what the stream looks like and how do we ensure that the data in the
- Apache Storm
-After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, a realtime large scale data processing becomes more and more important for many businesses. *Apache Storm* {% cite apachestorm --file streaming %} is actually one of the first system that can be described as "Hadoop of realtime" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*.
+After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, large scale stream processing becomes more and more important for many businesses. *Apache Storm* {% cite apachestorm --file streaming %} is actually one of the first system that can be described as "Hadoop of stream processing" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*.
In *Storm*, the logic of every processing job is described as a *Storm* topology. A *Storm* topology in *Storm* can be think of as a MapReduce job in Hadoop, the difference is that a MapReduce job will finish eventually but a *Storm* topology will run forever. There are three components in the topology: stream, spouts and bolts.
@@ -53,9 +69,31 @@ A bolt is where the processing really take place, it can take multiple streams a
A topology is then arbitrary combination of the three components, where spouts and bolts are the vertices and streams are the edges in the topology.
-Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consumer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt.
-A natural question to ask here is what if something goes run for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is splitted among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again.
+```ruby
+TopologyBuilder builder = new TopologyBuilder();
+builder.setSpout("words", new TestWordSpout(), 10);
+builder.setBolt("exclaim1", new ExclamationBolt(), 3)
+ .shuffleGrouping("words");
+builder.setBolt("exclaim2", new ExclamationBolt(), 5)
+ .shuffleGrouping("words")
+ .shuffleGrouping("exclaim1");
+
+```
+
+<figure class="fullwidth">
+ <img src="{{ site.baseurl }}/chapter/9/Topology" alt="The topology created by the example code" />
+</figure>
+
+Here is an simple topology which contains a spout and two bolts, where the spout emits words and each bolt would append exclamation '!' to its input. The nodes are arranged as shown in the graph. For example if the bolt emits the tuple ["Hi"], if it travels from exclaim1 to exclaim2, then exclaim2 would emit the words ["Hi!!"].
+
+Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. The parameter 10, 3 and 5 in the example code actually specify the amount of parallelism the user wants. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping as shown in our example, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consumer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt.
+
+A natural question to ask here is what if something goes wrong for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is splitted among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again.
+
+<figure class="fullwidth">
+ <img src="{{ site.baseurl }}/chapter/9/DAG" alt="The simple tuple DAG" />
+</figure>
There might be two concerns here. The first is how can *Storm* track every DAG efficiently and scalably, would it actually use more resources than just maintain the queues? The second concern is starting all the way from spout again instead of the intermediate queue seems taking a step backwards. For the first concern, *Storm* actually uses a very efficient algorithm to create the DAG of each tuple, it would take at mote 20 bytes for any tuple even the DAG contains trillions of tuples in it. For the second concern, if we look at the guarantees provided by both techniques, tracking DAG and intermediate queues, they are actually the same. They both guarantee that each tuple is processed at least once, so there is no fundamental differences between them.
@@ -63,17 +101,29 @@ Thus as shown before, *Storm* can guarantee the primitives, it can process a str
### b) Micro-batch
-We have seen *Apache Storm* as a real stream processing system that has the guarantees needed by such system. However, the core of *Storm* is to process stream at a granularity of each tuple. Sometimes such granularity is unnecessary, for the Twitter example that we had before, maybe we are only interested in the *stream* of tuples that came within a 5 minutes interval, with *Storm*, such specification can only be set on top of the system while one really want a convenient way to express such requirement within the system itself. In the next section, we would introduce several other stream processing systems, all of them can act on data stream in real time at large scale as *Storm*, but they provide more ways for the users to express how they want the tuples in the stream to be grouped and then processed. We refer to grouping the tuples before processing them as putting them into small *micro-batches*, and the processor can then provide results by working on those batches instead of single tuple.
+We have seen *Apache Storm* as a stream processing system that has the guarantees needed by such system. However, the core of *Storm* is to process stream at a granularity of each tuple. Sometimes such granularity is unnecessary, for the Twitter example that we had before, maybe we are only interested in the *stream* of tuples that came within a 5 minutes interval, with *Storm*, such specification can only be set on top of the system while one really want a convenient way to express such requirement within the system itself. In the next section, we would introduce several other stream processing systems, all of them can act on data stream in real time at large scale as *Storm*, but they provide more ways for the users to express how they want the tuples in the stream to be grouped and then processed. We refer to grouping the tuples before processing them as putting them into small *micro-batches*, and the processor can then provide results by working on those batches instead of single tuple.
- Spark Streaming
-The *Spark* streaming {% cite zaharia2012discretized --file streaming %} system is built upon the previous *Apache Spark* processing system, where it uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over traditional replication and backup schemes, and tolerate stragglers.
+The *Spark* streaming {% cite zaharia2012discretized --file streaming %} system is built upon *Apache Spark*, a system for large-scale parallel batch processing, which uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over traditional replication and backup schemes, and tolerate stragglers.
-The challenge of the fault-tolerance comes from the fact that the stream processing system might need hundreds of nodes, at such scale, two major problems are *faults* and *stragglers*. Some system use continuous processing model such as *Storm*, in which long-running, stateful queries receive each tuple, update its state and send out the result tuple. While such model is natural, it also makes difficult to handle faults. As shown before *Storm* uses *upstream backup*, where the messages are buffered and replayed if a message fail to be processed. Another approach for fault-tolerance used by previous system is replication, where there are two copies of everything. The first approach takes long time to recovery while the latter one costs double the storage space. Moreover, neither approach handles stragglers.
+The challenge of the fault-tolerance comes from the fact that the stream processing system might need hundreds of nodes, at such scale, two major problems are *faults* and *stragglers*. Some system use continuous processing model such as *Storm*, in which long-running, stateful queries receive each tuple, update its state and send out the result tuple. While such model is natural, it also makes difficult to handle faults. As shown before *Storm* uses *upstream backup*, where the messages are buffered and replayed if a message fail to be processed. Another approach for fault-tolerance used by previous system is replication, where there are two copies of everything. The first approach takes long time to recovery while the latter one costs double the storage space. Moreover, neither approach handles stragglers. In the first approach, a straggler must be treated as a failure which incurs a costly recovery while the straggler would slow down both replicas because of the use of synchronization protocols to coordinate replicas in the second approach.
-*Spark streaming* overcomes these challenges by a new stream processing model-instead of running long-lived queries, it divided a stream into a series of batched tuples on small time intervals, then launch a MapReduce job to process on the batch. Each computation is deterministic given the input data in that time interval, and this also makes *parallel recovery* possible, when a node fails, each node in the cluster works to recompute part of the lost node's RDDs. *Spark streaming* can also recover from straggler in a similar way.
+*Spark streaming* overcomes these challenges by a new stream processing model-instead of running long-lived queries, it divided a stream into a series of batched tuples on small time intervals, then launch a Spark job to process on the batch. Each computation is deterministic given the input data in that time interval, and this also makes *parallel recovery* possible, when a node fails, each node in the cluster works to recompute part of the lost node's RDDs. *Spark streaming* can also recover from straggler in a similar way.
-In the *D-stream* model, a streaming computation is treated as series of deterministic batch computations on small time intervals. Each batch of the stream is stored as RDDs, and the result after processing this RDD also be stored as RDDs. A *D-stream* is a sequence of RDDs that can be transformed into new *D-streams*. For example, a stream can be divided into one second batches, to process the events in second *s*, *Spark streaming* would first launch a map job to process the events happened in second *s* and it would then launch a reduce job that take both this mapped result the reduced result of data *s - 1*. Thus each *D-stream* can turn into a sequence of *RDDs*, and the *lineage* (i.e., the sequence of operations used to build it) of the *D-streams* are tracked for recovery. If a node fails, it would recover the lost RDD partitions by re-running the operations that used to create them. The re-computation can be ran in parallel on separate nodes since the *lineage* is distributed, and the work on straggler can be re-ran the same way.
+*D-stream* is the *Spark streaming* abstraction, and in the *D-stream* model, a streaming computation is treated as series of deterministic batch computations on small time intervals. Each batch of the stream is stored as RDDs, and the result after processing this RDD also be stored as RDDs. A *D-stream* is a sequence of RDDs that can be transformed into new *D-streams*. For example, a stream can be divided into one second batches, to process the events in second *s*, *Spark streaming* would first launch a map job to process the events happened in second *s* and it would then launch a reduce job that take both this mapped result the reduced result of data *s - 1*. Thus each *D-stream* can turn into a sequence of *RDDs*, and the *lineage* (i.e., the sequence of operations used to build it) of the *D-streams* are tracked for recovery. If a node fails, it would recover the lost RDD partitions by re-running the operations that used to create them. The re-computation can be ran in parallel on separate nodes since the *lineage* is distributed, and the work on straggler can be re-ran the same way.
+
+```ruby
+val ssc = new StreamingContext(conf, Seconds(1))
+val lines = ssc.socketTextStream("localhost", 9999)
+val words = lines.flatMap(_.split(" "))
+val pairs = words.map(word => (word, 1))
+val wordCounts = pairs.reduceByKey(_ + _)
+wordCounts.print()
+
+```
+
+Let's look at an example of how we can count the word received from a TCP socket with *Spark streaming*. We first set the processing interval to be 1 second, and we will create a *D-stream* lines that represents the streaming data received from the specific TCP socket. Then we split the lines by space into words, now the stream of words is represented as the words *D-stream*. The words stream is futher mapped to a *D-stream* of pairs, which is then reduced to count the number of words in each batch of data.
In conclusion, *Spark streaming* handles the slow recovery and straggler issue by dividing stream into small batches on small time intervals and using RDDs to keep track of how the result of certain batched stream is computed. This model makes handling recovery and straggler easier because the computation can be ran in parallel by re-computing the result while RDDs make the process fast.
@@ -81,7 +131,25 @@ In conclusion, *Spark streaming* handles the slow recovery and straggler issue b
*Naiad* {% cite murray2013naiad --file streaming %} is another distributed system for executing data stream which is developed by *Microsoft*. *Naiad* combines the benefits of high throughput of batch processors and the low latency of stream processors by its computation model called *timely dataflow* that enables dataflow computations with timestamps.
-The *timely dataflow*, like topology described in *Storm*, contains stateful vertices that represent the nodes that would compute on the stream. Each graph contains input vertices and output vertices, which are responsible for consuming or producing messages from external sources. Every message being exchanged is associated with a timestamp called epoch, the external source is responsible of providing such epoch and notifying the input vertices the end of each epoch. The notion of epoch is powerful since it allows the producer to arbitrarily determine the start and the end of each batch by assigning different epoch number on tuples. For example, the way to divide the epochs can be time as in *spark streaming*, or it can be the start of some event.
+The *timely dataflow*, like topology described in *Storm*, contains stateful vertices that represent the nodes that would compute on the stream. Each graph contains input vertices and output vertices, which are responsible for consuming or producing messages from external sources. Every message being exchanged is associated with a timestamp called epoch, the external source is responsible of providing such epoch and notifying the input vertices the end of each epoch. The notion of epoch is powerful since it allows the producer to arbitrarily determine the start and the end of each batch by assigning different epoch number on tuples. For example, the way to divide the epochs can be time as in *spark streaming*, or it can be the start of some event.
+
+<figure class="fullwidth">
+ <img src="{{ site.baseurl }}/chapter/9/TimelyD" alt="A simple Timely Dataflow" />
+</figure>
+
+
+```ruby
+void OnNotify(T time)
+{
+foreach (var pair in counts[time])
+ this.SendBy(output, pair, time);
+counts.Remove(time);
+}
+
+```
+
+In this example, A, B ,C are different processing vertices and each of them has one message being processed. For A, the number 1 in its message (e1,1) indicates that this messages is generated in epoch 1, thus a counter would increase by 1 if it counts the number of messages in epoch 1. Such a counter can be implemented as shown, where *counts* would count the number of distinct messages received. Once the vertex get notified that one epoch has ended, the OnNotify function would be triggered, and a count for each distinct input record would then be sent to output.
+
*Naiad* can also execute cyclic dataflow program. If there is a loop in the data flow graph, for example where the message need to be processed with the processed result of previous message, then each message circulating in the group has another counter associated with it along with the epoch. This loop counter would increase by one whenever it complete a loop once. Thus the epoch and counter can work together for the system to track progress of the whole computation.
@@ -105,7 +173,17 @@ The next question is then how does *Google Dataflow* knows when to emit the resu
In addition to controlling when results can be emitted, the system also provides a way to control how windows can relate to each other. The results can be *discarding*, where the contents would be discarded once triggering, this makes data storage more efficient since once the results are consumed, we can clear them from the buffers. The results can also be *accumulating*, once triggering, the contents are left intact and stored in persistent state, later results can become a refinement of previous results, this mode is useful when the downstream consumers are expected to overwrite old result once the new one comes, for example, we might want to write the count of a view of certain movie from the stream pipeline with low latency, and we can refine the count at the end of the day by running a slower batch process on the aggregated data. The last mode is *accumulating & retracting*, where in addition to *accumulating* semantics, a copy of the emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for the previous value will be emitted first, followed by the new value, this is useful when both the results from the previous processing and the later one are needed to be combined. For example, one process is counting the number of views during a certain period, a user went offline during the window and came back after the window ended when the result of the counting *c* was already emitted, the process now need to retract the previous result *c* and indicate that the correct number should be *c+1*.
-In terms of fault-tolerance, *Google Dataflow* relies on MillWhell as the underlying execution engine.
+```ruby
+Pcollection<KV<String, Integer>> output = input
+ .apply(Window.trigger(Repeat(AtPeriod(1, MINUTE)))
+ .accumulating())
+ .apply(Sum.integersPerKey());
+
+```
+
+The above example code shows how to apply a trigger that repeatedly fires on one-minute periodically. The *accumulating* mode is also specified so that the *Sum* can be refined overtime.
+
+*Google Dataflow* also relies on MillWheel{% cite akidau2013millwheel --file streaming %} as the underlying execution engine to achieve exactly-once-delivery of the tuples. MillWheel is a framework for building low-latency data-processing applications used at Google. It achieves exactly-once-delivery by first checking the incoming record and discard duplicated ones, then pending the productions (i.e., produce records to any stream) until the senders are acknowledges, only then the pending productions are sent.
In conclusion, one of the most important core principles that drives *Google Dataflow* is to accommodate the diversity of known use cases, it did so by providing a rich set of abstractions such as windowing, triggering and controlling. Compared to the 'specialized' system that we discussed above, *Google Dataflow* is a more general system that can fulfill batch, micro-batch, and stream processing requirements.
@@ -141,7 +219,7 @@ Twitter realized in order to meet all the needs, they needed a new real-time str
A key design goal for Heron is compatibility with the *Storm* API, thus Heron runs topologies, graphs with spouts and bolts like Storm. Unlike *Storm* though, the Heron topology is translated into a physical plan before actual execution, and there are multiple components in the physical plan.
-Each topology is run as an Aurora job, instead of using Nimbuz as scheduler, Twitter chose Aurora since it is developed and used by other Twitter projects. Each Aurora job is then consisted of several containers, the first container runs Topology Master, which provides a single point of contact for discovering the status of the topology and also serves as the gateway for the topology metrics through an endpoint. The other containers each run a Stream Manager, a Metrics Manager and a number of Heron Instances. The key functionality for each Stream Manager is to manage the routing of tuples efficiently, all Stream Managers are connected to each other and the tuples from Heron Instances in different containers would be transmitted through their Stream Managers, thus the Stream Managers can be viewed as Super Node for communication. Stream Manager also provides a backpressure mechanism, which can dynamically adjust the rate of the data flows through the network, for example, if the Stream Managers of the bolts are overwhelmed, they would then notice the Stream Managers of the spouts to slow down thus ensure all the data are properly processed. Heron Instance carries out the real work for a spout or a bolt, unlike worker in *Storm*, each Heron Instance runs only a single task as a process, in addition to performing the work, Heron Instance is also responsible for collecting multiple metrics. The metrics collected by Heron Instances would then be sent to the Metrics Manager in the same container and to the central monitoring system.
+Each topology is run as an Aurora{% cite apacheaurora --file streaming %} job, instead of using Nimbus{% cite nimbusproject --file streaming %} as scheduler. Nimbus used to be the master node of *Storm* that schedules and manages all running topologies, it delopys topology on *Storm*, and assigns workers to execute the topology where Aurora is also a service scheduler that can manage long-running services. Twitter chose Aurora since it is developed and used by other Twitter projects. Each Aurora job is then consisted of several containers, the first container runs Topology Master, which provides a single point of contact for discovering the status of the topology and also serves as the gateway for the topology metrics through an endpoint. The other containers each run a Stream Manager, a Metrics Manager and a number of Heron Instances. The key functionality for each Stream Manager is to manage the routing of tuples efficiently, all Stream Managers are connected to each other and the tuples from Heron Instances in different containers would be transmitted through their Stream Managers, thus the Stream Managers can be viewed as Super Node for communication. Stream Manager also provides a backpressure mechanism, if the receiver component is unable to handle incoming data/tuples, then the sender can dynamically adjust the rate of the data flows through the network. For example, if the Stream Managers of the bolts are overwhelmed, they would then notice the Stream Managers of the spouts to slow down thus ensure all the data are properly processed. Heron Instance carries out the real work for a spout or a bolt, unlike worker in *Storm*, each Heron Instance runs only a single task as a process, in addition to performing the work, Heron Instance is also responsible for collecting multiple metrics. The metrics collected by Heron Instances would then be sent to the Metrics Manager in the same container and to the central monitoring system.
The components in the Heron topology are clearly separated, so the failure in various level would be handled differently. For example, if the Topology Master dies, the container would restart the process, and the stand-by Topology Master would take over the master while the restarted would become the stand-by. When a Stream Manager dies, it gets started in the same container, and after rediscovers the Topology Master, it would fetch and check whether there are any changes need to be made in its state. Similarly, all the other failures can be handled gracefully by Heron.