aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--chapter/9/streaming.md58
1 files changed, 29 insertions, 29 deletions
diff --git a/chapter/9/streaming.md b/chapter/9/streaming.md
index c9585e1..f1268b1 100644
--- a/chapter/9/streaming.md
+++ b/chapter/9/streaming.md
@@ -4,16 +4,16 @@ title: "Large Scale Streaming Processing"
by: "Fangfan Li"
---
-The previous chapter discusses the large scale batch processing system, where the computation involves the pieces of data stored across the distributed file system. Those systems satisfy the requirements such as scalablibility and fault-tolerance for applications that deal with 'big data' stored in a distributed way. The batch processing systems are suitable for processing *static* datasets, where the input data do not change overtime during the whole process, thus the system can distribute the computation and perform synchronization assuming the inputs would stay the same during the whole computation. In such *static* model, the processing system can first *pull* data from the disk, and then perform the computation over the pulled data. However, a large number of networking applications are not *static*, instead, the data is constantly in motion, and the inputs would be provided as *stream*, as new data constantly arrives. In the *stream* model, data is *pushed* to the processor. This fundamental difference makes the traditional batch processing system un-suitable for streaming applications, as even the slightest change in the dataset would require the batch processer to *pull* the whole dataset and perform the computation again. Thus in this chapter, we would introduce the history and systems that are created for the streaming processing.
+The previous chapter discusses the large scale batch processing system, where the computation involves the pieces of data stored across the distributed file system. Those systems satisfy the requirements such as scalability and fault-tolerance for applications that deal with 'big data' stored in a distributed way. The batch processing systems are suitable for processing *static* datasets, where the input data do not change overtime during the whole process, thus the system can distribute the computation and perform synchronization assuming the inputs would stay the same during the whole computation. In such *static* model, the processing system can first *pull* data from the disk, and then perform the computation over the pulled data. However, a large number of networking applications are not *static*, instead, the data is constantly in motion, and the inputs would be provided as *stream*, as new data constantly arrives. In the *stream* model, data is *pushed* to the processor. This fundamental difference makes the traditional batch processing system un-suitable for streaming applications, as even the slightest change in the dataset would require the batch processor to *pull* the whole dataset and perform the computation again. Thus in this chapter, we would introduce the history and systems that are created for the streaming processing.
-There are many challenges for implementing large scale streaming processing system. Similar to large scale batch processing sytems, large scale streaming systems also have to deal with consistency and fault-tolenrace due to the distributed nature of those systems. Moreover, latency at the scale of several minutes is at most a nuisance in batch processing while latency is not as tolerable in large streaming processing.
+There are many challenges for implementing large scale streaming processing system. Similar to large scale batch processing systems, large scale streaming systems also have to deal with consistency and fault-tolerance due to the distributed nature of those systems. Moreover, latency at the scale of several minutes is at most a nuisance in batch processing while latency is not as tolerable in large streaming processing.
In the rest of this chapter, we would introduce the 1) History of streaming processing 2) How to represent the input data stream 3) What are the practices to process data stream 4) The state-of-the-art systems used by applications.
## Data in constant motion
This concept of streaming data can trace back to TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %}
-, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data streams. In contrast to trafitional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the query processing systems that deals with data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*.
+, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data streams. In contrast to traditional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the query processing systems that deals with data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*.
The important concepts of TelegraphCQ include *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, the queries with some commonality can be combined together to improve the performance.
@@ -21,7 +21,7 @@ TelegraphCQ shows the importance of modeling data as stream and how can we proce
## How to represent data stream
-Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a job in *Spark* {% cite zaharia2012resilient --file streaming %} to analyze them. The *Spark* job itselm may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless).
+Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a job in *Spark* {% cite zaharia2012resilient --file streaming %} to analyze them. The *Spark* job itself may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless).
Before dive into the details of the large scale processing, we would first introduce a few concepts: producer, processor and consumer. The producer is where the data stream comes from, it can be a user who are tweeting in the previous example, the consumer would be the advertisement team then, and the processor is then the *magical* component that we need to produce the results. The producers and consumers are fairly straight forward, it is the processor that are being discussed in this chapter.
@@ -31,17 +31,17 @@ We have been talking about the stream of data, but this is a bit under-specified
A natural view of a data stream can be an infinite sequence of tuples reading from a queue. However, a traditional queue would not be sufficient in large scale system since the consumed tuple might got lost or the consumer might fail thus it might request the previous tuple after a restart. The alternative queue design is a multi-consumer queue, where a pool of readers may read from a single queue and each record goes to one of them, which is more suitable for a distributed system. In a traditional multi-consumer queue, once a consumer reads the data out, it is gone. This would be problematic in a large stream processing system, since the messages are more likely to be lost during transmission, and we want to keep track of what are the data that are successfully being consumed and what are the data that might be lost on their way towards the consumer. Thus we need a little fancier queue to keep track of *what* has been consumed, in order to suit the distributed environment of large stream processing system.
-An intuitive choice would be recording the message when sending it out and wait for the acknowledgment from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of lossing messages, because if a message is lost, it would not be acknoledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is performance, the sender has now to keep track of every single messages being sent out with multiple stages. Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination.
+An intuitive choice would be recording the message when sending it out and wait for the acknowledgment from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of losing messages, because if a message is lost, it would not be acknowledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is performance, the sender has now to keep track of every single messages being sent out with multiple stages. Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination.
## How to process data stream
-Now we know what the stream looks like and how do we ensure that the data in the stream are successfully processed. We would then talk about the processors that cosume the data stream. There are two main approaches in processing data stream. The first approach is the continuous queries model, similar to TelegraphCQ, where the queries keep running and the arrival of data intiates the processing. Another approach is micro-batching, where the streaming computation becomes a series of stateless, deterministic batch computations on batch of stream, where certain timer would triger the processing on the batch in those systems. We would discuss Apach Storm as an example for the fist design and Spark Streaming, Naiad and Google Dataflow are examples of the second approach. These systems not only differ in the way how they process stream, but also how they ensure fault-tolerance which is one of the most important aspects of large scale distributed system.
+Now we know what the stream looks like and how do we ensure that the data in the stream are successfully processed. We would then talk about the processors that consume the data stream. There are two main approaches in processing data stream. The first approach is the continuous queries model, similar to TelegraphCQ, where the queries keep running and the arrival of data initiates the processing. Another approach is micro-batching, where the streaming computation becomes a series of stateless, deterministic batch computations on batch of stream, where certain timer would trigger the processing on the batch in those systems. We would discuss Apache Storm as an example for the fist design and Spark Streaming, Naiad and Google Dataflow are examples of the second approach. These systems not only differ in the way how they process stream, but also how they ensure fault-tolerance which is one of the most important aspects of large scale distributed system.
### a) Continuous queries (operators) on each tuple
- Apache Storm
-After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, a realtime large scale data processing becomes more and more important for many businesses. *Apache Storm* {% cite apachestorm --file streaming %} is actually one of the first system that can be discribed as "Hadoop of realtime" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*.
+After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, a realtime large scale data processing becomes more and more important for many businesses. *Apache Storm* {% cite apachestorm --file streaming %} is actually one of the first system that can be described as "Hadoop of realtime" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*.
In *Storm*, the logic of every processing job is described as a *Storm* topology. A *Storm* topology in *Storm* can be think of as a MapReduce job in Hadoop, the difference is that a MapReduce job will finish eventually but a *Storm* topology will run forever. There are three components in the topology: stream, spouts and bolts.
@@ -51,9 +51,9 @@ The next abstraction in a topology is spout. A spout is a source of streams. For
A bolt is where the processing really take place, it can take multiple streams as input and produce multiple streams as output. Bolts are where the logic of the topology are implemented, they can run functions, filter data, compute aggregations and so forth.
-A tolopogy is then arbitrary combination of the three components, where spouts and bolts are the vertices and streams are the edges in the topology.
+A topology is then arbitrary combination of the three components, where spouts and bolts are the vertices and streams are the edges in the topology.
-Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consymer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt.
+Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consumer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt.
A natural question to ask here is what if something goes run for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is splitted among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again.
@@ -67,13 +67,13 @@ We have seen *Apache Storm* as a real stream processing system that has the guar
- Spark Streaming
-The *Spark* streaming {% cite zaharia2012discretized --file streaming %} system is built upon the previous *Apache Spark* processing system, where it uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over trafitional replication and backup schemes, and tolerate stragglers.
+The *Spark* streaming {% cite zaharia2012discretized --file streaming %} system is built upon the previous *Apache Spark* processing system, where it uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over traditional replication and backup schemes, and tolerate stragglers.
The challenge of the fault-tolerance comes from the fact that the stream processing system might need hundreds of nodes, at such scale, two major problems are *faults* and *stragglers*. Some system use continuous processing model such as *Storm*, in which long-running, stateful queries receive each tuple, update its state and send out the result tuple. While such model is natural, it also makes difficult to handle faults. As shown before *Storm* uses *upstream backup*, where the messages are buffered and replayed if a message fail to be processed. Another approach for fault-tolerance used by previous system is replication, where there are two copies of everything. The first approach takes long time to recovery while the latter one costs double the storage space. Moreover, neither approach handles stragglers.
*Spark streaming* overcomes these challenges by a new stream processing model-instead of running long-lived queries, it divided a stream into a series of batched tuples on small time intervals, then launch a MapReduce job to process on the batch. Each computation is deterministic given the input data in that time interval, and this also makes *parallel recovery* possible, when a node fails, each node in the cluster works to recompute part of the lost node's RDDs. *Spark streaming* can also recover from straggler in a similar way.
-In the *D-stream* model, a streaming computaion is treated as series of deterministic batch computaions on small time intervals. Each batch of the stream is stored as RDDs, and the result after processing this RDD also be stored as RDDs. A *D-stream* is a sequence of RDDs that can be transformed into new *D-streams*. For example, a stream can be divided into one second batches, to process the events in second *s*, *Spark streaming* would first launch a map job to process the events happened in second *s* and it would then launch a reduce job that take both this mapped result the reduced result of data *s - 1*. Thus each *D-stream* can turn into a sequence of *RDDs*, and the *lineage* (i.e., the sequence of operations used to build it) of the *D-streams* are tracked for recovery. If a node fails, it would recover the lost RDD partitions by re-running the operations that used to create them. The re-computation can be ran in parallel on separate nodes since the *lineage* is distributed, and the work on straggler can be re-ran the same way.
+In the *D-stream* model, a streaming computation is treated as series of deterministic batch computations on small time intervals. Each batch of the stream is stored as RDDs, and the result after processing this RDD also be stored as RDDs. A *D-stream* is a sequence of RDDs that can be transformed into new *D-streams*. For example, a stream can be divided into one second batches, to process the events in second *s*, *Spark streaming* would first launch a map job to process the events happened in second *s* and it would then launch a reduce job that take both this mapped result the reduced result of data *s - 1*. Thus each *D-stream* can turn into a sequence of *RDDs*, and the *lineage* (i.e., the sequence of operations used to build it) of the *D-streams* are tracked for recovery. If a node fails, it would recover the lost RDD partitions by re-running the operations that used to create them. The re-computation can be ran in parallel on separate nodes since the *lineage* is distributed, and the work on straggler can be re-ran the same way.
In conclusion, *Spark streaming* handles the slow recovery and straggler issue by dividing stream into small batches on small time intervals and using RDDs to keep track of how the result of certain batched stream is computed. This model makes handling recovery and straggler easier because the computation can be ran in parallel by re-computing the result while RDDs make the process fast.
@@ -89,7 +89,7 @@ Tracking process is not a trivial task since there are many messages with differ
*Naiad* is the implementation of *timely dataflow* in a cluster, where the tracker on each machine would broadcast both the messages that has not been consumed and recently been consumed in order for every tracker to maintain a single view of the global *could-result-in* map, thus the process of the whole computation is guaranteed. *Naiad* also optimizes its performance by dealing with micro-stragglers such as making changes on TCP layer to reduce network latency and customizing garbage collection methods.
-Another interesting point about *Naiad* is how it deals with failures. As described before, there are systems that achieve fault-tolerance by replication and systems such as *Storm* that would replay the tuple from beginning. Then we have *Spark streaming*, which would keep the *lineage* of all operations and is able to rebuilt the RDDs in parallel. *Naiad* more or less can be seen as an example that takes the replay approach, it would checkpoint the computation and can perform potentially more compact checkpointing when requested. When the system periodically checkpoints, all processes would pause and finish ungoing works. Then the system would perform checkpointing on each vertex and then resume. To recover from a failure, all live processes would revert to the last durable checkpoint, and the work from the failed vertex would be reassigned to other processes. This method might have higher latency for recovery due to both checkpointing and resuming than other approaches.
+Another interesting point about *Naiad* is how it deals with failures. As described before, there are systems that achieve fault-tolerance by replication and systems such as *Storm* that would replay the tuple from beginning. Then we have *Spark streaming*, which would keep the *lineage* of all operations and is able to rebuilt the RDDs in parallel. *Naiad* more or less can be seen as an example that takes the replay approach, it would checkpoint the computation and can perform potentially more compact checkpointing when requested. When the system periodically checkpoints, all processes would pause and finish ongoing works. Then the system would perform checkpointing on each vertex and then resume. To recover from a failure, all live processes would revert to the last durable checkpoint, and the work from the failed vertex would be reassigned to other processes. This method might have higher latency for recovery due to both checkpointing and resuming than other approaches.
In short, *Naiad* allows processing of messages from different epochs and aggregating result from the same epoch by using timestamps on messages. Moreover, by allowing producers to set epoch on messages arbitrarily (i.e., set logical time), *Naiad* provides a powerful way to create batches of streams. However, the computation model of *Naiad* introduce high latency when dealing with failures.
@@ -97,62 +97,62 @@ In short, *Naiad* allows processing of messages from different epochs and aggreg
We now have seen three different systems that can process data stream in large scale, however, each of them are constraint in the way of viewing the dataset. *Storm* can perform stream processing on each tuple, where *Spark streaming* and *Naiad* have their own way of grouping tuples together into small batches before processing. The authors of *Google Dataflow* {% cite akidau2015dataflow --file streaming %} believe that the fundamental problem of those views is they are limited by the processing engine, for example, if you were to use *Spark streaming* to process the stream, you can only group the tuples into small time intervals. The motivation of *Google Dataflow* is then a general underlying system with which the users can express what processing model they want.
-*Google Dataflow* is a system that allows batch, micro-bath and stream processing where users can choose based on the tradeoffs provided by each processing model: latency or resouce constraint. *Google Dataflow* implements many features in order to achieve its goal, and we will breifly talk about them.
+*Google Dataflow* is a system that allows batch, micro-bath and stream processing where users can choose based on the tradeoffs provided by each processing model: latency or resouce constraint. *Google Dataflow* implements many features in order to achieve its goal, and we will briefly talk about them.
-*Google Dataflow* provides a windowing model that supports unaligned event-time windows, which helped the users to express how to batch the tuples together in a stream. Windowing slices a dataset into finite chunks for processing as a group, one can think of it as batching as we discussed before. Unaligned windows are the windows that would only be applied to certain tuples during the period, for example, if we have an unaligned window *w[1:00,2:00)(k)*, and only the events with key *k* during the time period [1:00, 2:00) would be grouped by this window. This is powerful since it provides an aternative way of batching tuples other than just time before processing.
+*Google Dataflow* provides a windowing model that supports unaligned event-time windows, which helped the users to express how to batch the tuples together in a stream. Windowing slices a dataset into finite chunks for processing as a group, one can think of it as batching as we discussed before. Unaligned windows are the windows that would only be applied to certain tuples during the period, for example, if we have an unaligned window *w[1:00,2:00)(k)*, and only the events with key *k* during the time period [1:00, 2:00) would be grouped by this window. This is powerful since it provides an alternative way of batching tuples other than just time before processing.
-The next question is then how does *Google Dataflow* knows when to emit the results of a certain window, this requires some other signal to show when the window is done. *Google Dataflow* handles this by providing different choices of triggering methods. One example would be completion estimation, this is useful when combined with percentile watermarks, one might only care about processing a minimum percentage of the input data quickly than finishing every last piece of it. Another interesting triggering method is responsing to data arrival, this is useful for application that are grouping data based on the number of them, for example, the processor can be fired once 100 data points are received. These real triggering semantics help *Google Dataflow* to become a general purposed processing system, the first method allows the users to deal with stragglers while the second one provides a way to support tuple-based windows.
+The next question is then how does *Google Dataflow* knows when to emit the results of a certain window, this requires some other signal to show when the window is done. *Google Dataflow* handles this by providing different choices of triggering methods. One example would be completion estimation, this is useful when combined with percentile watermarks, one might only care about processing a minimum percentage of the input data quickly than finishing every last piece of it. Another interesting triggering method is responding to data arrival, this is useful for application that are grouping data based on the number of them, for example, the processor can be fired once 100 data points are received. These real triggering semantics help *Google Dataflow* to become a general purposed processing system, the first method allows the users to deal with stragglers while the second one provides a way to support tuple-based windows.
-In addition to controlling when results can be emitted, the system also provides a way to control how windows can relate to each other. The results can be *discarding*, where the contents would be discarded once triggering, this makes data storage more efficient since once the results are consumed, we can clear them from the buffers. The results can also be *accumulating*, once triggering, the contents are left intact and stored in persistent state, later results can become a refinement of previous results, this mode is useful when the downstream consumersis expected to overwrite old result once the new one comes, for example, we might want to write the count of a view of certain movie from the stream pipeline with low latency, and we can refine the count at the end of the day by running a slower batch process on the aggregated data. The last mode is *accumulating & retracting*, where in addition to *accumulating* semantics, a copy of the emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for the previous value will be emitted first, followed by the new value, this is useful when both the results from the previous processing and the later one are needed to be combined. For example, one process is counting the number of views during a certain period, a user went offline during the window and came back after the window ended when the result of the counting *c* was already emitted, the process now need to retract the previous result *c* and indicate that the correct number should be *c+1*.
+In addition to controlling when results can be emitted, the system also provides a way to control how windows can relate to each other. The results can be *discarding*, where the contents would be discarded once triggering, this makes data storage more efficient since once the results are consumed, we can clear them from the buffers. The results can also be *accumulating*, once triggering, the contents are left intact and stored in persistent state, later results can become a refinement of previous results, this mode is useful when the downstream consumers are expected to overwrite old result once the new one comes, for example, we might want to write the count of a view of certain movie from the stream pipeline with low latency, and we can refine the count at the end of the day by running a slower batch process on the aggregated data. The last mode is *accumulating & retracting*, where in addition to *accumulating* semantics, a copy of the emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for the previous value will be emitted first, followed by the new value, this is useful when both the results from the previous processing and the later one are needed to be combined. For example, one process is counting the number of views during a certain period, a user went offline during the window and came back after the window ended when the result of the counting *c* was already emitted, the process now need to retract the previous result *c* and indicate that the correct number should be *c+1*.
In terms of fault-tolerance, *Google Dataflow* relies on MillWhell as the underlying execution engine.
-In conclusion, one of the most important core princiles that drives *Google Dataflow* is to accommodate the diversity of known use cases, it did so by providing a rich set of abstractions such as windowing, triggering and controlling. Compared to the 'specialized' system that we discussed above, *Google Dataflow* is a more general system that can fulfill batch, micro-batch, and stream processing requirements.
+In conclusion, one of the most important core principles that drives *Google Dataflow* is to accommodate the diversity of known use cases, it did so by providing a rich set of abstractions such as windowing, triggering and controlling. Compared to the 'specialized' system that we discussed above, *Google Dataflow* is a more general system that can fulfill batch, micro-batch, and stream processing requirements.
## The systems being used nowadays
Till now we have talked about what is stream processing and what are the different model/system built for this purpose. As shown before, the systems vary on how they view stream, for example *Storm* can perform operation on the level of each tuple while *Spark streaming* could group tuples into micro-batches and then process on the level of batch. They also differ on how to deal with failures, *Storm* can replay the tuple from spout while *Naiad* would keep checkpointing. Then we introduced *Google Dataflow*, which is seems the most powerful tool so far that allows the users to express how to group and control the tuples in the stream.
-Despite all the differences among them, they all started with more or less the same goal: to be *the* stream processing system that would be used by companies, and we showed several examples of why companies might need such system. In this section, we would discuss two companies that use the stream processing system as the core of their bussiness: Twitter and Spotify.
+Despite all the differences among them, they all started with more or less the same goal: to be *the* stream processing system that would be used by companies, and we showed several examples of why companies might need such system. In this section, we would discuss two companies that use the stream processing system as the core of their business: Twitter and Spotify.
## Twitter
-Twitter is one of the 'go-to' exmaples that people would think of when considering large scale stream processing system, since it has a huge amount of data that needed to be processed in real-time. Twitter bought the company that created *Storm* and used *Storm* as its real-time analysis tool for several years {% cite toshniwal2014storm --file streaming %}. However, as the data volume along with the more complex use cases increased, Twitter needed to build a new real-time stream data processing system as *Storm* can no longer satisfies the new requirements. We would talk about how *Storm* was used at Twitter and then the system that they built to replace *Storm*-*Heron*.
+Twitter is one of the 'go-to' examples that people would think of when considering large scale stream processing system, since it has a huge amount of data that needed to be processed in real-time. Twitter bought the company that created *Storm* and used *Storm* as its real-time analysis tool for several years {% cite toshniwal2014storm --file streaming %}. However, as the data volume along with the more complex use cases increased, Twitter needed to build a new real-time stream data processing system as *Storm* can no longer satisfies the new requirements. We would talk about how *Storm* was used at Twitter and then the system that they built to replace *Storm*-*Heron*.
- Storm@Twitter
-Twitter requires processing complext computation on streaming data in real-time since each interaction with a user requires making a number of complex decisions, often based on data that has just been created, and they use *Storm* as the real-time distributed stream data processing engine. As we described before, *Storm* represents one of the early open-source and popular stream processing systems that is in use today, and was developed by Nathan Marz at BackType which was acquired by Twitter in 2011. After the acquisition, *Storm* has been improved and open-sourced by Twitter and then picked up by various other organizations.
+Twitter requires processing complex computation on streaming data in real-time since each interaction with a user requires making a number of complex decisions, often based on data that has just been created, and they use *Storm* as the real-time distributed stream data processing engine. As we described before, *Storm* represents one of the early open-source and popular stream processing systems that is in use today, and was developed by Nathan Marz at BackType which was acquired by Twitter in 2011. After the acquisition, *Storm* has been improved and open-sourced by Twitter and then picked up by various other organizations.
We will first briefly introduce the structure of *Storm* at Twitter. *Storm* runs on a distributed cluster, and clients submit topologies to a master node, which is in charge of distributing and coordinating the execution of the topologies. The actual bolts and spouts are tasks, and multiple tasks are grouped into executor, multiple executors are in turn grouped into a worker. The worker process would then be distributed to an actual worker node (i.e., machine), where there can be multiple worker processes be running on. Each worker node runs a supervisor that communicates with the master node thus the state of the computation can be tracked.
As shown before, *Storm* can guarantee each tuple is processed 'at least once', however, at Twitter, *Storm* can provide two types of semantic guarantees-'at least once' and 'at most once'. 'At least once' semantic is guaranteed by the directed acyclic graph as we showed before, and 'at most once' semantic is guaranteed by dropping the tuple in case of a failure (e.g., by disabling the acknowledgements of each tuple). Note that for 'at least once' semantic, the coordinators (i.e., Zookeeper) would checkpoint each processed tuple in the topology, and the system can start processing tuples from the last 'checkpoint' that is recorded once recovered from a failure.
-*Storm* fulfilled many requirements at Twitter with satisfactory performance. *Storm* was running on hundreds of servers and several hundreds of topologies ran on these clusters some of which run on more than a few hundred nodes, terabytes of data flows through the cluster everyday and generated several billions of output tuples. These topologies were used to do both simple tasks such as filtering and aggregating the content of various streams and complex tasks such as machine learning on stream data. *Storm* was resilient to failures and achieved relatively low latency, a machine can be taken down for maintainance without interrupting the topology and the 99% response time for processing a tuple is close to 1ms.
+*Storm* fulfilled many requirements at Twitter with satisfactory performance. *Storm* was running on hundreds of servers and several hundreds of topologies ran on these clusters some of which run on more than a few hundred nodes, terabytes of data flows through the cluster everyday and generated several billions of output tuples. These topologies were used to do both simple tasks such as filtering and aggregating the content of various streams and complex tasks such as machine learning on stream data. *Storm* was resilient to failures and achieved relatively low latency, a machine can be taken down for maintenance without interrupting the topology and the 99% response time for processing a tuple is close to 1ms.
In conclusion, *Storm* was a critical infrastructure at Twitter that powered many of the real-time data-driven decisions that were made at Twitter.
- Twitter Heron
-*Storm* has long serverd as the core of Twitter for real-time analysis, however, as the scale of data being processed has increased, along with the increase in the diversity and the number of use cases, many limitations of *Storm* became apparent {% cite kulkarni2015twitter --file streaming %}.
+*Storm* has long served as the core of Twitter for real-time analysis, however, as the scale of data being processed has increased, along with the increase in the diversity and the number of use cases, many limitations of *Storm* became apparent {% cite kulkarni2015twitter --file streaming %}.
-There are several issues with *Storm* that make using is at Twitter become challenging. The first challenge is debug-bility, there is no clean mapping from the logical units of computation in the topology to each physical process, this makes finding the root cause of misbehavior extremely hard. Another challenge is as the cluster resouces becomes precious, the need for dedicated cluster resources in *Storm* leads to inefficiency and it is better to share resources across different types of systems. In addition, Twitter needs a more efficient system, simply with the increase scale, any improvement in performance can translate to huge benefit.
+There are several issues with *Storm* that make using is at Twitter become challenging. The first challenge is debug-bility, there is no clean mapping from the logical units of computation in the topology to each physical process, this makes finding the root cause of misbehavior extremely hard. Another challenge is as the cluster resources becomes precious, the need for dedicated cluster resources in *Storm* leads to inefficiency and it is better to share resources across different types of systems. In addition, Twitter needs a more efficient system, simply with the increase scale, any improvement in performance can translate to huge benefit.
-Twitter realized in order to meet all the needs, they needed a new real-time stream data processing system-Heron, which is API-compatible with Storm and provides significant performance improvements, lower resouce consumption along with better debug-ability scalability and manageability.
+Twitter realized in order to meet all the needs, they needed a new real-time stream data processing system-Heron, which is API-compatible with Storm and provides significant performance improvements, lower resource consumption along with better debug-ability scalability and manageability.
A key design goal for Heron is compatibility with the *Storm* API, thus Heron runs topologies, graphs with spouts and bolts like Storm. Unlike *Storm* though, the Heron topology is translated into a physical plan before actual execution, and there are multiple components in the physical plan.
-Each topology is run as an Aurora job, instead of using Nimbuz as scheduler, Twitter chose Aurora since it is developed and used by other Twitter projects. Each Aurora job is then consisted of several containers, the first container runs Topology Master, which provides a single point of contact for discovering the status of the topology and also serves as the gateway for the topology metrics through an endpoint. The other containers each run a Stream Manager, a Metrics Manager and a number of Heron Instances. The key functionality for each Stream Manager is to manage the routing of tuples efficiently, all Stream Managers are connected to each other and the tuples from Heron Instances in different containers would be transmitted through their Stream Managers, thus the Stream Managers can be viewed as Super Node for communication. Stream Manager also provides a backpressure mechanism, which can dynamically adjust the rate of the data flows through the network, for example, if the Stream Managers of the bolts are overwhelmed, they would then notice the Stream Managers of the spouts to slow down thus ensure all the data are properly processed. Heron Instance carries out the real work for a spout or a bolt, unlike woker in *Storm*, each Heron Instance runs only a single task as a process, in addition to performing the work, Heron Instance is also responsible for collecting multiple metrics. The metrics collected by Heron Instances would then be sent to the Metrics Manager in the same container and to the central monitoring system.
+Each topology is run as an Aurora job, instead of using Nimbuz as scheduler, Twitter chose Aurora since it is developed and used by other Twitter projects. Each Aurora job is then consisted of several containers, the first container runs Topology Master, which provides a single point of contact for discovering the status of the topology and also serves as the gateway for the topology metrics through an endpoint. The other containers each run a Stream Manager, a Metrics Manager and a number of Heron Instances. The key functionality for each Stream Manager is to manage the routing of tuples efficiently, all Stream Managers are connected to each other and the tuples from Heron Instances in different containers would be transmitted through their Stream Managers, thus the Stream Managers can be viewed as Super Node for communication. Stream Manager also provides a backpressure mechanism, which can dynamically adjust the rate of the data flows through the network, for example, if the Stream Managers of the bolts are overwhelmed, they would then notice the Stream Managers of the spouts to slow down thus ensure all the data are properly processed. Heron Instance carries out the real work for a spout or a bolt, unlike worker in *Storm*, each Heron Instance runs only a single task as a process, in addition to performing the work, Heron Instance is also responsible for collecting multiple metrics. The metrics collected by Heron Instances would then be sent to the Metrics Manager in the same container and to the central monitoring system.
The components in the Heron topology are clearly separated, so the failure in various level would be handled differently. For example, if the Topology Master dies, the container would restart the process, and the stand-by Topology Master would take over the master while the restarted would become the stand-by. When a Stream Manager dies, it gets started in the same container, and after rediscovers the Topology Master, it would fetch and check whether there are any changes need to be made in its state. Similarly, all the other failures can be handled gracefully by Heron.
-Heron addresses the challenges of *Storm*. First, each task is performed by a single Heron Instance, and the different functionalities are abstracted into different level, which makes debug clear. Second, the provisioning of resouces is abstracted out thus made sharing infrastucture with other systems easier. Third, Heron provides multiple metrics along with the backpressure mechanism, which can be used to precisely reason about and achieve a consistent rate of delevering results.
+Heron addresses the challenges of *Storm*. First, each task is performed by a single Heron Instance, and the different functionalities are abstracted into different level, which makes debug clear. Second, the provisioning of resources is abstracted out thus made sharing infrastructure with other systems easier. Third, Heron provides multiple metrics along with the backpressure mechanism, which can be used to precisely reason about and achieve a consistent rate of delivering results.
-*Storm* has been decommissioned and Heron is now the de-facto streaming system at Twitter and an interesting note is that after migrating all the topologies to Heron, there was an overall 3X reduction in hardware. Not only Heron reduces the infrastracture needed, it also outperform *Storm* by delivering 6-14X improvements in throughput, and 5-10X reductions in tuple latencies.
+*Storm* has been decommissioned and Heron is now the de-facto streaming system at Twitter and an interesting note is that after migrating all the topologies to Heron, there was an overall 3X reduction in hardware. Not only Heron reduces the infrastructure needed, it also outperform *Storm* by delivering 6-14X improvements in throughput, and 5-10X reductions in tuple latencies.
## Spotify
-Another company that deployes large scale distributed system is Spotify {% cite spotifylabs --file streaming %}. Every small piece of information, such as listening to a song or searching an artist, is sent to Spotify servers and processed. There are many features of Spotify that need such stream processing system, such as music/playlist recommendations. Originally, Spotify would collect all the data generated from client softwares and store them in their HDFS, and those data would then be processed on hourly basis by a batch job (i.e., the data collected each hour would be stored and processed together).
+Another company that deploys large scale distributed system is Spotify {% cite spotifylabs --file streaming %}. Every small piece of information, such as listening to a song or searching an artist, is sent to Spotify servers and processed. There are many features of Spotify that need such stream processing system, such as music/playlist recommendations. Originally, Spotify would collect all the data generated from client softwares and store them in their HDFS, and those data would then be processed on hourly basis by a batch job (i.e., the data collected each hour would be stored and processed together).
-In the original Spotify structure, each job must determine, with high probability, that all data from the hourly bucket has successflly written to a persistent storage before firing the job. Each job were running as a batch job by reading the files from the storage, so late-arriving data for already completed bucket can not be appended since jobs generally only read data once from a hourly bucket, thus each job has to treat late data differently. All late data is written to a currently open hourly bucket then.
+In the original Spotify structure, each job must determine, with high probability, that all data from the hourly bucket has successfully written to a persistent storage before firing the job. Each job were running as a batch job by reading the files from the storage, so late-arriving data for already completed bucket can not be appended since jobs generally only read data once from a hourly bucket, thus each job has to treat late data differently. All late data is written to a currently open hourly bucket then.
Spotify then decided to use *Google Dataflow*, since the features provided by it is exactly what Spotify wants. The previous batch jobs can be written as streaming jobs with one hour window size, and all the data stream can be grouped based on both window and key, while the late arriving data can be gracefully handled if the controlling is set to *accumulating & retracting*. Also, *Google Dataflow* also reduces the export latency of the hourly analysis results, since when assigning windows, Spotify would have an early trigger that is set to emit pane (i.e., result) every N tuples until the window is closed.