diff options
| -rw-r--r-- | chapter/9/streaming.md | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/chapter/9/streaming.md b/chapter/9/streaming.md index b7bc4f3..d030a2b 100644 --- a/chapter/9/streaming.md +++ b/chapter/9/streaming.md @@ -8,20 +8,22 @@ The previous chapter discusses the large scale batch processing system, where th There are many challenges for implementing large scale streaming processing system. Similar to large scale batch processing systems, large scale streaming systems also have to deal with consistency and fault-tolerance due to the distributed nature of those systems. Moreover, latency at the scale of several minutes is at most a nuisance in batch processing while latency is not as tolerable in large streaming processing. -Despite those challenges, there are many active research and production in the stream processing area, and we want to answer the following questions in this article: 1) what are the earliest ideas of stream processing, why would we want to analyze a stream of data 2) what exactly is a stream, how is it implemented in real system 3) what are the systems that are built for large scale stream processing, and what are the differences between them 4) what are the systems that are being used by companies for their applications, do they build their own system or they would use the existing systems. +Despite those challenges, there are many active research and productions in the stream processing area, and we want to answer the following questions in this article: 1) what are the earliest ideas of stream processing, why would people want to analyze a stream of data 2) what exactly is a stream, how is it implemented in real system 3) what are the systems that are built for large scale stream processing, and what are the differences between them 4) what are the systems that are being used by companies for their applications, do they build their own system or they would use the existing systems. ## Data in constant motion -Computing data stream has long been studied in the area of Theory of Computing. Assume we have a sequence of elements, and we want to compute the frequency moments of the data (i.e., count how many of each of the distinct data appear in the sequence). To do that, we could maintain a full histogram on the data, a counter for each data value. However, the memory that we have is not unlimited, thus we can not gather every data, we can then use randomized algorithms for approximating the frequency moments with limited resource{% cite alon1996space --file streaming %}. So analyzing the stream using random algorithm was because the lack of storage. +Computing data stream has long been studied in the area of Theory of Computing. Assume we have a sequence of elements, and we want to compute the frequency moments of the data (i.e., count how many of each of the distinct data appear in the sequence). To do that, we could maintain a full histogram on the data, a counter for each data value. However, the memory that we have is not unlimited, thus we can not gather every data, we can then use randomized algorithms for approximating the frequency moments with limited resource{% cite alon1996space --file streaming %}. Thus analyzing the stream using random algorithm was because of the lack of computation resources. -Besides randomized processing on the data sequence, systems were also being developed to deal with the input data that is not static and predicatable. Instead of motivating by the lack of resources, those projects were mostly motivated by the fact that in emerging networked environments, data is the commodity of interest, and its value is realized only within the time that it is needed. TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %} is one example among those earliest such systems, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data. In contrast to traditional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing. The examples of applications that use *data in motion* include: event-based processing where the system would react to a some special data received, query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the systems that can query processing over data stream. The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*. +Besides randomized processing on the data sequence, systems were also being developed to deal with the input data that is not static and predicatable. Instead of dealing with the lack of resources, those projects were mostly motivated by the fact that in emerging networked environments, the value of the ever increasing amount of data is realized only within the time that it is needed. TelegraphCQ {% cite chandrasekaran2003telegraphcq --file streaming %} is one example among those earliest such systems, which aims at meeting the challenges that arise in handling large streams of continuous queries over high-volume, high-variable data. In contrast to traditional view that data can be found statically in known locations, the authors of TelegraphCQ realized that data becomes fluid and being constantly moving and changing, thus the traditional database can 'pull' data from the storage while data is being 'pushed' into the query processor in case of processing stream. The examples of applications that use this *data in motion* include: event-based processing where the system would react to some special data received or when some event happens (e.g., at a certain time), and query processing over streaming data sources such as network monitoring. TelegraphCQ is one example of the systems that can query processing over data stream. -The important concepts of TelegraphCQ include *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, in order to avoid blocking and having to interrupt the dataflow, data should be processed simultaneously by all the queries that require the dataflow. In TelegraphCQ, those queries with such commonality can be combined together to improve the performance. +The fundamental difference between TelegraphCQ to other traditional query system is the view of input data, instead of handling a query with detailed static data, TelegraphCQ has to react to the newly arrived data and process the queries *on-the-fly*. In order to always react, the query need to be alway running, so TelegraphCQ runs *continuous queries*, where the queries are constantly running and as new data arrives, the processor would route it to the set of active queries that are listening. TelegraphCQ also uses *shared processing* to avoid the overhead of processing each query individually, in order to avoid blocking and having to interrupt the dataflow, data should be processed simultaneously by all the queries that require this dataflow. In TelegraphCQ, those queries with such commonality can be combined together to improve the performance. -TelegraphCQ shows the importance of modeling data as stream and how can we process such data stream however it was only implemented in a non-distributed prototype. +TelegraphCQ shows the importance of modeling data as stream and how can we process such data stream, however it was only implemented in a non-distributed prototype. -TODO: Other systems +Beyond TelegraphCQ, there are systems that were built for continuously quering on large scale streaming data. For example, PipelineDB{% cite pipelinedb --file streaming %} is a system that was designed to run SQL queries continuously on streaming data, where the output of those continuous queries is stored in regular tables which can be queried like any other table. PipelineDB can reduce the cardinality of its input streams by performing different filtering or aggregations on stream once the continous queries read the raw data, and only the needed information would then be persisted to disk (i.e., the raw data is then discarded). By doing this, PipelineDB can process large volumes of data very efficiently using relatively small number of resources. + +As we described before, stream processing is not only query processing, and there are many systems that can perform event-based stream processing in large scale which we would discuss in detail in section 3. ## How to represent data stream @@ -34,7 +36,7 @@ Before dive into the details of the large scale processing, we would first intro - The processor is then the *magical* component that takes the stream and produce the results. <figure class="fullwidth"> - <img src="{{ site.baseurl }}/chapter/9/DiagraphStream" alt="An example of a stream processing system" /> + <img src="{{ site.baseurl }}/chapter/9/DiagramStream.jpg" alt="An example of a stream processing system" /> </figure> The producers and consumers are fairly straight forward, it is the processor that are being discussed in this chapter. @@ -47,11 +49,19 @@ A natural view of a data stream can be an infinite sequence of tuples reading fr A naïve approach to attempting to handle lost messages or failures could be to record the message upon sending it, and to wait for the acknowledgement from the receiver. This simple method is a pragmatic choice since the storage in many messaging systems are scarce resources, the system want to free the data immediately once it knows it is consumed successfully thus to keep the queue small. However, getting the two ends to come into agreement about what has been consumed in not a trivial problem. Acknowledgement fixes the problem of losing messages, because if a message is lost, it would not be acknowledged thus the data is still in the queue and can be sent again, this would ensure that each message is processed at least once, however, it also creates new problems. First problem is the receiver might successfully consumed the message *m1* but fail to send the acknowledgment, thus the sender would send *m1* again and the receiver would process the same data twice. Another problem is memory consumption, since the sender has now to keep track of every single messages being sent out with multiple stages, and only free them when acknowledged. -Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. The queue of messages is divided into a set of partitions, each partition is consumed by exactly one consumer at any given time. By doing this Kafka ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. This makes the state about what has been consumed very small, just one number for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. The queues in Kafka also allows users to rewind the stream and replay everything from the point of interest. For example, if the user code has a bug which is discovered later, the user can re-consume those messages once the bug is fixed while ensuring that the processed events are in the order of their origination. +<figure class="fullwidth"> + <img src="{{ site.baseurl }}/chapter/9/Kafka.jpg" alt="An example of a stream processing system" /> +</figure> + +Apache Kafka {% cite apachekafka --file streaming %} handles this differently to achieve better performance. Apache Kafka is a distributed streaming platform, where the producer, processor and consumers can all subscribe to, and create/read the stream they need from, one can think of Kafka as the stream between all components in a stream processing system. Records in Kafka are grouped in topics, where each topic is a category to which this record is published. Each topic is then divided into several partitions, where one topic can always have multi-subscriber and each partion has one reader at a time. Each record is assigned with a offset that uniquely identifies it in that partition. By doing this Kafka can ensure that the only reader of that partition and consumes the data in order. Since there are many partitions of each topic, Kafka balances the load over many consumer instances by assigning different partitions to them. This makes the state about what has been consumed very small, just one number (i.e., the offset) for each partition, and by periodically checkpointing, the equivalent of message acknowledgements becomes very cheap. Kafka retains all published records whether they have been consumed or not during their configurable retention period, this also allows consumsers to rewind the stream and replay everything from the point of interest by going back to the specific offset. For example, if the user code has a bug which is discovered later, the user can re-consume those messages from the previous offset once the bug is fixed while ensuring that the processed events are in the order of their origination, or the user can simply start computing with the latest records from "now". + +With the notions of topics and partitions, Kafka guarantees that the total order over records within a partition, and multiple consumers can subscribe to a single topic which would increase the throughput. If a strong guarantee on the ordering of all records in a topic is needed, the user can simply put all records in this topic into one partition. + +Those features of Apache Kafka make it a very popular platform used by many stream processing systems, and we can think of the stream as Apache Kafka in the rest of this article. ## How to process data stream -Now we know what the stream looks like and how do we ensure that the data in the stream are successfully processed. We would then talk about the processors that consume the data stream. There are two main approaches in processing data stream. The first approach is the continuous queries model, similar to TelegraphCQ, where the queries keep running and the arrival of data initiates the processing. Another approach is micro-batching, where the streaming computation becomes a series of stateless, deterministic batch computations on batch of stream, where certain timer would trigger the processing on the batch in those systems. We would discuss Apache Storm as an example for the fist design and Spark Streaming, Naiad and Google Dataflow are examples of the second approach. These systems not only differ in the way how they process stream, but also how they ensure fault-tolerance which is one of the most important aspects of large scale distributed system. +Now we know what the stream looks like and how do we ensure that the data in the stream are successfully processed. We would then talk about the processors that consume the data stream. There are two main approaches in processing data stream. The first approach is the continuous queries model, similar to TelegraphCQ, where the queries keep running and the arrival of data initiates the processing. Another approach is micro-batching, where the streaming computation becomes a series of stateless, deterministic batch computations on batch of stream, where certain timer would trigger the processing on the batch in those systems. We would discuss Apache Storm as one example for the fist design and Spark Streaming, Naiad and Google Dataflow are examples of the second approach. These systems not only differ in the way how they process stream, but also how they ensure fault-tolerance which is one of the most important aspects of large scale distributed system. ### a) Continuous queries (operators) on each tuple @@ -82,17 +92,17 @@ builder.setBolt("exclaim2", new ExclamationBolt(), 5) ``` <figure class="fullwidth"> - <img src="{{ site.baseurl }}/chapter/9/Topology" alt="The topology created by the example code" /> + <img src="{{ site.baseurl }}/chapter/9/Topology.jpg" alt="The topology created by the example code" /> </figure> -Here is an simple topology which contains a spout and two bolts, where the spout emits words and each bolt would append exclamation '!' to its input. The nodes are arranged as shown in the graph. For example if the bolt emits the tuple ["Hi"], if it travels from exclaim1 to exclaim2, then exclaim2 would emit the words ["Hi!!"]. +Here is how we can build a simple topology which contains a spout and two bolts, where the spout emits words and each bolt would append exclamation '!' to its input. The nodes are arranged as shown in the graph. For example if the bolt emits the tuple ["Hi"], if it travels from exclaim1 to exclaim2, then exclaim2 would emit the words ["Hi!!"]. Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. The parameter 10, 3 and 5 in the example code actually specify the amount of parallelism the user wants. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping as shown in our example, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consumer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt. A natural question to ask here is what if something goes wrong for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is splitted among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again. <figure class="fullwidth"> - <img src="{{ site.baseurl }}/chapter/9/DAG" alt="The simple tuple DAG" /> + <img src="{{ site.baseurl }}/chapter/9/DAG.jpg" alt="The simple tuple DAG" /> </figure> There might be two concerns here. The first is how can *Storm* track every DAG efficiently and scalably, would it actually use more resources than just maintain the queues? The second concern is starting all the way from spout again instead of the intermediate queue seems taking a step backwards. For the first concern, *Storm* actually uses a very efficient algorithm to create the DAG of each tuple, it would take at mote 20 bytes for any tuple even the DAG contains trillions of tuples in it. For the second concern, if we look at the guarantees provided by both techniques, tracking DAG and intermediate queues, they are actually the same. They both guarantee that each tuple is processed at least once, so there is no fundamental differences between them. @@ -134,7 +144,7 @@ In conclusion, *Spark streaming* handles the slow recovery and straggler issue b The *timely dataflow*, like topology described in *Storm*, contains stateful vertices that represent the nodes that would compute on the stream. Each graph contains input vertices and output vertices, which are responsible for consuming or producing messages from external sources. Every message being exchanged is associated with a timestamp called epoch, the external source is responsible of providing such epoch and notifying the input vertices the end of each epoch. The notion of epoch is powerful since it allows the producer to arbitrarily determine the start and the end of each batch by assigning different epoch number on tuples. For example, the way to divide the epochs can be time as in *spark streaming*, or it can be the start of some event. <figure class="fullwidth"> - <img src="{{ site.baseurl }}/chapter/9/TimelyD" alt="A simple Timely Dataflow" /> + <img src="{{ site.baseurl }}/chapter/9/TimelyD.jpg" alt="A simple Timely Dataflow" /> </figure> @@ -174,14 +184,14 @@ The next question is then how does *Google Dataflow* knows when to emit the resu In addition to controlling when results can be emitted, the system also provides a way to control how windows can relate to each other. The results can be *discarding*, where the contents would be discarded once triggering, this makes data storage more efficient since once the results are consumed, we can clear them from the buffers. The results can also be *accumulating*, once triggering, the contents are left intact and stored in persistent state, later results can become a refinement of previous results, this mode is useful when the downstream consumers are expected to overwrite old result once the new one comes, for example, we might want to write the count of a view of certain movie from the stream pipeline with low latency, and we can refine the count at the end of the day by running a slower batch process on the aggregated data. The last mode is *accumulating & retracting*, where in addition to *accumulating* semantics, a copy of the emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for the previous value will be emitted first, followed by the new value, this is useful when both the results from the previous processing and the later one are needed to be combined. For example, one process is counting the number of views during a certain period, a user went offline during the window and came back after the window ended when the result of the counting *c* was already emitted, the process now need to retract the previous result *c* and indicate that the correct number should be *c+1*. ```ruby -Pcollection<KV<String, Integer>> output = input +PCollection<KV<String, Integer>> output = input .apply(Window.trigger(Repeat(AtPeriod(1, MINUTE))) .accumulating()) .apply(Sum.integersPerKey()); ``` -The above example code shows how to apply a trigger that repeatedly fires on one-minute periodically. The *accumulating* mode is also specified so that the *Sum* can be refined overtime. +The above example code shows how to apply a trigger that repeatedly fires on one-minute period. The *accumulating* mode is also specified so that the *Sum* can be refined overtime. *Google Dataflow* also relies on MillWheel{% cite akidau2013millwheel --file streaming %} as the underlying execution engine to achieve exactly-once-delivery of the tuples. MillWheel is a framework for building low-latency data-processing applications used at Google. It achieves exactly-once-delivery by first checking the incoming record and discard duplicated ones, then pending the productions (i.e., produce records to any stream) until the senders are acknowledges, only then the pending productions are sent. |
