From beaf57cdad4fe6d04ddcbe0322f8f2ebf9214b8d Mon Sep 17 00:00:00 2001 From: Fangfan Li Date: Fri, 9 Dec 2016 11:14:34 -0500 Subject: Added bibs --- chapter/9/streaming.md | 54 ++++++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 28 deletions(-) (limited to 'chapter/9') diff --git a/chapter/9/streaming.md b/chapter/9/streaming.md index 095f874..d2e2975 100644 --- a/chapter/9/streaming.md +++ b/chapter/9/streaming.md @@ -3,7 +3,6 @@ layout: page title: "Large Scale Streaming Processing" by: "Fangfan Li" --- -#Large Scale Streaming Processing The previous chapter discusses the large scale batch processing system, where the computation involves the pieces of data stored across the distributed file system. Those systems satisfy the requirements such as scalablibility and fault-tolerance for applications that deal with 'big data' stored in a distributed way. The batch processing systems are suitable for processing *static* datasets, where the input data do not change overtime during the whole process, thus the system can distribute the computation and perform synchronization assuming the inputs would stay the same during the whole computation. In such *static* model, the processing system can first *pull* data from the disk, and then perform the computation over the pulled data. However, a large number of networking applications are not *static*, instead, the data is constantly in motion, and the inputs would be provided as *stream*, as new data constantly arrives. In the *stream* model, data is *pushed* to the processor. This fundamental difference makes the traditional batch processing system un-suitable for streaming applications, as even the slightest change in the dataset would require the batch processer to *pull* the whole dataset and perform the computation again. Thus in this chapter, we would introduce the history and systems that are created for the streaming processing. @@ -11,17 +10,18 @@ There are many challenges for implementing large scale streaming processing syst In the rest of this chapter, we would introduce the 1) History of streaming processing 2) How to represent the input data stream 3) What are the practices to process data stream 4) The state-of-the-art systems used by applications. -##Data in constant motion +## Data in constant motion -This concept of streaming data can trace back to TelegraphCQ, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data streams. In contrast to trafitional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the query processing systems that deals with data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*. +This concept of streaming data can trace back to TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %} +, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data streams. In contrast to trafitional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the query processing systems that deals with data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*. The important concepts of TelegraphCQ include *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, the queries with some commonality can be combined together to improve the performance. TelegraphCQ shows the importance of modeling data as stream and how can we process such data stream. But TelegraphCQ was only implemented in a non-distributed prototype, we would then discuss how data stream is processed in a large scale. -##How to represent data stream +## How to represent data stream -Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a job in *Spark* to analyze them. The *Spark* job itselm may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless). +Why would we need to process data stream in a large scale? I will use an example to illustrate the idea. For example, assume you are Twitter, and you have a constant feed of user's comments and posts, you want to find out what is the most *trending* topic right now that people are talking about, and your advertisement team want to follow on that. You can store all the posts that happened during the day from 12:01 a.m to 11:59 p.m in a large file system and then run a job in *Spark* {% cite zaharia2012resilient --file streaming %} to analyze them. The *Spark* job itselm may again probably take several hours, but after all these works, the *trending* topic comes out from your analysis might be useless since it might not be hot anymore. Thus we want a stream processing system that can take the constant stream of posts from all different sources as input and output the result with low latency (i.e., before it becomes useless). Before dive into the details of the large scale processing, we would first introduce a few concepts: producer, processor and consumer. The producer is where the data stream comes from, it can be a user who are tweeting in the previous example, the consumer would be the advertisement team then, and the processor is then the *magical* component that we need to produce the results. The producers and consumers are fairly straight forward, it is the processor that are being discussed in this chapter. @@ -31,19 +31,19 @@ We have been talking about the stream of data, but this is a bit under-specified A natural view of a data stream can be an infinite sequence of tuples reading from a queue. However, a traditional queue would not be sufficient in large scale system since the consumed tuple might got lost or the consumer might fail thus it might request the previous tuple after a restart. The alternative queue design is a multi-consumer queue, where a pool of readers may read from a single queue and each record goes to one of them, which is more suitable for a distributed system. In a traditional multi-consumer queue, once a consumer reads the data out, it is gone. This would be problematic in a large stream processing system, since the messages are more likely to be lost during transmission, and we want to keep track of what are the data that are successfully being consumed and what are the data that might be lost on their way towards the consumer. Thus we need a little fancier queue to keep track of *what* has been consumed, in order to suit the distributed environment of large stream processing system. -An intuitive choice would be recording the message when sending it out and wait for the acknowledgment from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of lossing messages, because if a message is lost, it would not be acknoledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is performance, the sender has now to keep track of every single messages being sent out with multiple stages. Apache Kafka handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination. +An intuitive choice would be recording the message when sending it out and wait for the acknowledgment from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of lossing messages, because if a message is lost, it would not be acknoledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is performance, the sender has now to keep track of every single messages being sent out with multiple stages. Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination. -##How to process data stream +## How to process data stream Now we know what the stream looks like and how do we ensure that the data in the stream are successfully processed. We would then talk about the processors that cosume the data stream. There are two main approaches in processing data stream. The first approach is the continuous queries model, similar to TelegraphCQ, where the queries keep running and the arrival of data intiates the processing. Another approach is micro-batching, where the streaming computation becomes a series of stateless, deterministic batch computations on batch of stream, where certain timer would triger the processing on the batch in those systems. We would discuss Apach Storm as an example for the fist design and Spark Streaming, Naiad and Google Dataflow are examples of the second approach. These systems not only differ in the way how they process stream, but also how they ensure fault-tolerance which is one of the most important aspects of large scale distributed system. -###a) Continuous queries (operators) on each tuple +### a) Continuous queries (operators) on each tuple -####Apache Storm +- Apache Storm -After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, a realtime large scale data processing becomes more and more important for many businesses. *Apache Storm* is actually one of the first system that can be discribed as "Hadoop of realtime" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*. +After MapReduce, Hadoop, and the related batch processing system came out, the data can be processed at scales previously unthinkable. However, as we stated before, a realtime large scale data processing becomes more and more important for many businesses. *Apache Storm* {% cite apachestorm --file streaming %} is actually one of the first system that can be discribed as "Hadoop of realtime" that feed the needs. Users can process messages in a way that doesn't lose data and also scalable with the primitives provided by *Storm*. -In *Storm*, the logic of every processing job is described as a *Storm* topology. A *Storm* topology in *Storm* can be think of as a MapReduce job in Hadoop, the difference is that a MapReduce job will finish eventually but a Storm topology will run forever. There are three components in the topology: stream, spouts and bolts. +In *Storm*, the logic of every processing job is described as a *Storm* topology. A *Storm* topology in *Storm* can be think of as a MapReduce job in Hadoop, the difference is that a MapReduce job will finish eventually but a *Storm* topology will run forever. There are three components in the topology: stream, spouts and bolts. In *Storm*, a stream is a unbounded sequence of tuples, tuples can contain arbitrary types of data, which also related to the core concept of *Storm*: process the tuples in a stream. @@ -61,13 +61,13 @@ There might be two concerns here. The first is how can *Storm* track every DAG e Thus as shown before, *Storm* can guarantee the primitives, it can process a stream of data, distribute the work among multiple workers and guarantee each tuple in the stream is processed. -###b) Micro-batch +### b) Micro-batch We have seen *Apache Storm* as a real stream processing system that has the guarantees needed by such system. However, the core of *Storm* is to process stream at a granularity of each tuple. Sometimes such granularity is unnecessary, for the Twitter example that we had before, maybe we are only interested in the *stream* of tuples that came within a 5 minutes interval, with *Storm*, such specification can only be set on top of the system while one really want a convenient way to express such requirement within the system itself. In the next section, we would introduce several other stream processing systems, all of them can act on data stream in real time at large scale as *Storm*, but they provide more ways for the users to express how they want the tuples in the stream to be grouped and then processed. We refer to grouping the tuples before processing them as putting them into small *micro-batches*, and the processor can then provide results by working on those batches instead of single tuple. -####Spark Streaming +- Spark Streaming -The *Spark* streaming system is built upon the previous *Apache Spark* processing system, where it uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over trafitional replication and backup schemes, and tolerate stragglers. +The *Spark* streaming {% cite zaharia2012discretized --file streaming %} system is built upon the previous *Apache Spark* processing system, where it uses a data-sharing abstraction called 'Resilient Distributed Datasets' or RDDs to ensure fault-tolerance while achieve extremly low latency. The challenges with 'big data' stream processing were long recovery time when failure happens, and the the stragglers might increase the processing time of the whole system. Spark streaming overcomes those challenges by a parallel recovery mechanism that improves efficiency over trafitional replication and backup schemes, and tolerate stragglers. The challenge of the fault-tolerance comes from the fact that the stream processing system might need hundreds of nodes, at such scale, two major problems are *faults* and *stragglers*. Some system use continuous processing model such as *Storm*, in which long-running, stateful queries receive each tuple, update its state and send out the result tuple. While such model is natural, it also makes difficult to handle faults. As shown before *Storm* uses *upstream backup*, where the messages are buffered and replayed if a message fail to be processed. Another approach for fault-tolerance used by previous system is replication, where there are two copies of everything. The first approach takes long time to recovery while the latter one costs double the storage space. Moreover, neither approach handles stragglers. @@ -77,9 +77,9 @@ In the *D-stream* model, a streaming computaion is treated as series of determin In conclusion, *Spark streaming* handles the slow recovery and straggler issue by dividing stream into small batches on small time intervals and using RDDs to keep track of how the result of certain batched stream is computed. This model makes handling recovery and straggler easier because the computation can be ran in parallel by re-computing the result while RDDs make the process fast. -####Naiad +- Naiad -*Naiad* is another distributed system for executing data stream which is developed by *Microsoft*. *Naiad* combines the benefits of high throughput of batch processors and the low latency of stream processors by its computation model called *timely dataflow* that enables dataflow computations with timestamps. +*Naiad* {% cite murray2013naiad --file streaming %} is another distributed system for executing data stream which is developed by *Microsoft*. *Naiad* combines the benefits of high throughput of batch processors and the low latency of stream processors by its computation model called *timely dataflow* that enables dataflow computations with timestamps. The *timely dataflow*, like topology described in *Storm*, contains stateful vertices that represent the nodes that would compute on the stream. Each graph contains input vertices and output vertices, which are responsible for consuming or producing messages from external sources. Every message being exchanged is associated with a timestamp called epoch, the external source is responsible of providing such epoch and notifying the input vertices the end of each epoch. The notion of epoch is powerful since it allows the producer to arbitrarily determine the start and the end of each batch by assigning different epoch number on tuples. For example, the way to divide the epochs can be time as in *spark streaming*, or it can be the start of some event. @@ -93,9 +93,9 @@ Another interesting point about *Naiad* is how it deals with failures. As descri In short, *Naiad* allows processing of messages from different epochs and aggregating result from the same epoch by using timestamps on messages. Moreover, by allowing producers to set epoch on messages arbitrarily (i.e., set logical time), *Naiad* provides a powerful way to create batches of streams. However, the computation model of *Naiad* introduce high latency when dealing with failures. -####Google Dataflow +- Google Dataflow -We now have seen three different systems that can process data stream in large scale, however, each of them are constraint in the way of viewing the dataset. *Storm* can perform stream processing on each tuple, where *Spark streaming* and *Naiad* have their own way of grouping tuples together into small batches before processing. The authors of *Google Dataflow* believe that the fundamental problem of those views is they are limited by the processing engine, for example, if you were to use *Spark streaming* to process the stream, you can only group the tuples into small time intervals. The motivation of *Google Dataflow* is then a general underlying system with which the users can express what processing model they want. +We now have seen three different systems that can process data stream in large scale, however, each of them are constraint in the way of viewing the dataset. *Storm* can perform stream processing on each tuple, where *Spark streaming* and *Naiad* have their own way of grouping tuples together into small batches before processing. The authors of *Google Dataflow* {% cite akidau2015dataflow --file streaming %} believe that the fundamental problem of those views is they are limited by the processing engine, for example, if you were to use *Spark streaming* to process the stream, you can only group the tuples into small time intervals. The motivation of *Google Dataflow* is then a general underlying system with which the users can express what processing model they want. *Google Dataflow* is a system that allows batch, micro-bath and stream processing where users can choose based on the tradeoffs provided by each processing model: latency or resouce constraint. *Google Dataflow* implements many features in order to achieve its goal, and we will breifly talk about them. @@ -110,16 +110,16 @@ In terms of fault-tolerance, *Google Dataflow* relies on MillWhell as the underl In conclusion, one of the most important core princiles that drives *Google Dataflow* is to accommodate the diversity of known use cases, it did so by providing a rich set of abstractions such as windowing, triggering and controlling. Compared to the 'specialized' system that we discussed above, *Google Dataflow* is a more general system that can fulfill batch, micro-batch, and stream processing requirements. -##The systems being used nowadays, how ideas combined and products produced +## The systems being used nowadays Till now we have talked about what is stream processing and what are the different model/system built for this purpose. As shown before, the systems vary on how they view stream, for example *Storm* can perform operation on the level of each tuple while *Spark streaming* could group tuples into micro-batches and then process on the level of batch. They also differ on how to deal with failures, *Storm* can replay the tuple from spout while *Naiad* would keep checkpointing. Then we introduced *Google Dataflow*, which is seems the most powerful tool so far that allows the users to express how to group and control the tuples in the stream. Despite all the differences among them, they all started with more or less the same goal: to be *the* stream processing system that would be used by companies, and we showed several examples of why companies might need such system. In this section, we would discuss two companies that use the stream processing system as the core of their bussiness: Twitter and Spotify. -###Twitter +## Twitter -Twitter is one of the 'go-to' exmaples that people would think of when considering large scale stream processing system, since it has a huge amount of data that needed to be processed in real-time. Twitter bought the company that created *Storm* and used *Storm* as its real-time analysis tool for several years. However, as the data volume along with the more complex use cases increased, Twitter needed to build a new real-time stream data processing system as *Storm* can no longer satisfies the new requirements. We would talk about how *Storm* was used at Twitter and then the system that tey built to replace *Storm*-*Heron*. +Twitter is one of the 'go-to' exmaples that people would think of when considering large scale stream processing system, since it has a huge amount of data that needed to be processed in real-time. Twitter bought the company that created *Storm* and used *Storm* as its real-time analysis tool for several years {% cite toshniwal2014storm --file streaming %}. However, as the data volume along with the more complex use cases increased, Twitter needed to build a new real-time stream data processing system as *Storm* can no longer satisfies the new requirements. We would talk about how *Storm* was used at Twitter and then the system that tey built to replace *Storm*-*Heron*. -####Storm@Twitter +- Storm@Twitter Twitter requires processing complext computation on streaming data in real-time since each interaction with a user requires making a number of complex decisions, often based on data that has just been created, and they use *Storm* as the real-time distributed stream data processing engine. As we described before, *Storm* represents one of the early open-source and popular stream processing systems that is in use today, and was developed by Nathan Marz at BackType which was acquired by Twitter in 2011. After the acquisition, *Storm* has been improved and open-sourced by Twitter and then picked up by various other organizations. @@ -131,9 +131,9 @@ As shown before, *Storm* can guarantee each tuple is processed 'at least once', In conclusion, *Storm* was a critical infrastructure at Twitter that powered many of the real-time data-driven decisions that were made at Twitter. -####Twitter Heron +- Twitter Heron -*Storm* has long serverd as the core of Twitter for real-time analysis, however, as the scale of data being processed has increased, along with the increase in the diversity and the number of use cases, many limitations of *Storm* became apparent. +*Storm* has long serverd as the core of Twitter for real-time analysis, however, as the scale of data being processed has increased, along with the increase in the diversity and the number of use cases, many limitations of *Storm* became apparent {% cite kulkarni2015twitter --file streaming %}. There are several issues with *Storm* that make using is at Twitter become challenging. The first challenge is debug-bility, there is no clean mapping from the logical units of computation in the topology to each physical process, this makes finding the root cause of misbehavior extremely hard. Another challenge is as the cluster resouces becomes precious, the need for dedicated cluster resources in *Storm* leads to inefficiency and it is better to share resources across different types of systems. In addition, Twitter needs a more efficient system, simply with the increase scale, any improvement in performance can translate to huge benefit. @@ -149,8 +149,8 @@ Heron addresses the challenges of *Storm*. First, each task is performed by a si *Storm* has been decommissioned and Heron is now the de-facto streaming system at Twitter and an interesting note is that after migrating all the topologies to Heron, there was an overall 3X reduction in hardware. Not only Heron reduces the infrastracture needed, it also outperform *Storm* by delivering 6-14X improvements in throughput, and 5-10X reductions in tuple latencies. -###Spotify -Another company that deployes large scale distributed system is Spotify. Every small piece of information, such as listening to a song or searching an artist, is sent to Spotify servers and processed. There are many features of Spotify that need such stream processing system, such as music/playlist recommendations. Originally, Spotify would collect all the data generated from client softwares and store them in their HDFS, and those data would then be processed on hourly basis by a batch job (i.e., the data collected each hour would be stored and processed together). +## Spotify +Another company that deployes large scale distributed system is Spotify {% cite spotifylabs --file streaming %}. Every small piece of information, such as listening to a song or searching an artist, is sent to Spotify servers and processed. There are many features of Spotify that need such stream processing system, such as music/playlist recommendations. Originally, Spotify would collect all the data generated from client softwares and store them in their HDFS, and those data would then be processed on hourly basis by a batch job (i.e., the data collected each hour would be stored and processed together). In the original Spotify structure, each job must determine, with high probability, that all data from the hourly bucket has successflly written to a persistent storage before firing the job. Each job were running as a batch job by reading the files from the storage, so late-arriving data for already completed bucket can not be appended since jobs generally only read data once from a hourly bucket, thus each job has to treat late data differently. All late data is written to a currently open hourly bucket then. @@ -158,8 +158,6 @@ Spotify then decided to use *Google Dataflow*, since the features provided by it The worst end-to-end latency observed with new Spotify system based on *Google Dataflow* is four times lower than the previous system and also with much lower operational overhead. -{% cite Uniqueness --file streaming %} - ## References {% bibliography --file streaming %} \ No newline at end of file -- cgit v1.2.3