diff options
| author | Fangfan Li <fangfanli28@gmail.com> | 2016-12-16 15:38:39 -0500 |
|---|---|---|
| committer | Fangfan Li <fangfanli28@gmail.com> | 2016-12-16 15:38:39 -0500 |
| commit | 3596406610b1ec2040e16b9c96da4f263c34a0fd (patch) | |
| tree | fa3f6cf3cd87917c221bc32cf93b854d3dddf1fb /chapter/9/streaming.md | |
| parent | dc95aed27e9d4234d1d2796c1b429ab8842ba42e (diff) | |
Flink, Alibaba
Diffstat (limited to 'chapter/9/streaming.md')
| -rw-r--r-- | chapter/9/streaming.md | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/chapter/9/streaming.md b/chapter/9/streaming.md index d030a2b..751f1f0 100644 --- a/chapter/9/streaming.md +++ b/chapter/9/streaming.md @@ -23,7 +23,11 @@ TelegraphCQ shows the importance of modeling data as stream and how can we proce Beyond TelegraphCQ, there are systems that were built for continuously quering on large scale streaming data. For example, PipelineDB{% cite pipelinedb --file streaming %} is a system that was designed to run SQL queries continuously on streaming data, where the output of those continuous queries is stored in regular tables which can be queried like any other table. PipelineDB can reduce the cardinality of its input streams by performing different filtering or aggregations on stream once the continous queries read the raw data, and only the needed information would then be persisted to disk (i.e., the raw data is then discarded). By doing this, PipelineDB can process large volumes of data very efficiently using relatively small number of resources. -As we described before, stream processing is not only query processing, and there are many systems that can perform event-based stream processing in large scale which we would discuss in detail in section 3. +As we described before, stream processing is not only query processing. Apache Flink {% cite apacheflink --file streaming %} is a system that supports both event-based processing and query processing. Each program in Flink is a streaming dafalow consisting of streams and transformation operators, the stream of data in a streaming dataflow can come from multiple sources (i.e., producers) and travel to one or more sinks (i.e., consumers). The stream of data would get transformed when travelling through the operators, where the computations happen. In order to distribute the work, streams are split into stream partitions and operators are split into operator subtasks in Flink where each subtask executes independently. + +What is event-based processing in Flink then? Unlike batch processing, to aggregate a event is more subtle in stream processing, for example we can not count the element in a stream since it is generally unbounded. Instead, Flink enable event-based processing with the notion of time and windows, for example, we can specify something like 'count over 5 minutes window'. Besides time-based window, Flink also supports count windows, and such event would be 'do something when the 100th elements arrive'. Flink has different notions of time such as event time when an event was created and processing time which is when the operator performs a time-based operation. The time are then used internally to keep the order and state for each event and also used by the windowing logic. The flexible streaming windows can then be transformed to flexible triggering condition which makes event-based processing possible in Flink. + +We just very briefly introduced PipelineDB and Apache Flink here, and there are many other systems that can perform stream processing in large scale and we would look into few examples in detail in section 3. ## How to represent data stream @@ -99,7 +103,7 @@ Here is how we can build a simple topology which contains a spout and two bolts, Since all the works are distributed, any given vertex is not necessarily running on a single machine, instead they can be spread on different workers in the cluster. The parameter 10, 3 and 5 in the example code actually specify the amount of parallelism the user wants. *Storm* also provides different *stream grouping* schemes for users to determine which vertex should be consuming the output stream from a given vertex. The grouping method can be shuffle grouping as shown in our example, where the tuples from the output stream will be randomly distributed across this bolt's consumers in a way such that each consumer is guaranteed to get an equal number of tuples. Another example would be fields grouping, where the tuples of the stream is partitioned by the fields specified in the grouping, the tuples with the same value in that field would always go to the same bolt. -A natural question to ask here is what if something goes wrong for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is splitted among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again. +A natural question to ask here is what if something goes wrong for example a single tuple get lost. One might think that *Storm* maintains a queue similar to what we discussed before to ensure that every tuple is processed at least once. In fact, *Storm* does not keep such queues internally, the reason might be that there would be so many states to maintain if it needs to construct such queue for every edge. In stead, *Storm* maintains a directed acyclic graph (DAG) for every single tuple, where each DAG contains the information of this tuple as how the original tuple is split among different workers. *Storm* uses the DAG to track each tuple, if the tuple fails to be processed, then the system would retry the tuple from the spout again. <figure class="fullwidth"> <img src="{{ site.baseurl }}/chapter/9/DAG.jpg" alt="The simple tuple DAG" /> @@ -135,7 +139,23 @@ wordCounts.print() Let's look at an example of how we can count the word received from a TCP socket with *Spark streaming*. We first set the processing interval to be 1 second, and we will create a *D-stream* lines that represents the streaming data received from the specific TCP socket. Then we split the lines by space into words, now the stream of words is represented as the words *D-stream*. The words stream is futher mapped to a *D-stream* of pairs, which is then reduced to count the number of words in each batch of data. -In conclusion, *Spark streaming* handles the slow recovery and straggler issue by dividing stream into small batches on small time intervals and using RDDs to keep track of how the result of certain batched stream is computed. This model makes handling recovery and straggler easier because the computation can be ran in parallel by re-computing the result while RDDs make the process fast. +*Spark streaming* handles the slow recovery and straggler issue by dividing stream into small batches on small time intervals and using RDDs to keep track of how the result of certain batched stream is computed. This model makes handling recovery and straggler easier because the computation can be ran in parallel by re-computing the result while RDDs make the process fast. + +**Structured Streaming** Besides *Spark streaming*, Apache Spark recently added a new higher-level API, *Structured Streaming*{% cite structuredstreaming --file streaming %}, which is also built on top of the notion of RDDs while makes a strong guarantee that the output of the application is equivalent to executing a batch job on a prefix of data at any time, which is also known as *prefix integrity*. *Structured Streaming* makes sure that the output tables are always consistent with all the records in a prefix of the data, thus the out-of-order data is easy to identify and can simply be used to update its respective row in the table. *Structured Streaming* provides a simple API where the users can just specify the query as if it were a static table, and the systems would automatically convert this query to a stream processing job. + +```ruby +// Read data continuously from an S3 location +val inputDF = spark.readStream.json("s3://logs") + +// Do operations using the standard DataFrame API and write to MySQL +inputDF.groupBy($"action", window($"time", "1 hour")).count() + .writeStream.format("jdbc") + .start("jdbc:mysql//...") + +``` +The programming model of *Structured Streaming* views the latest data as newly appended rows in an unbounded table, every trigger interval, new rows would be added to the existing table which would eventually update the output table. The event-time then becomes nature in this view, since each event from producers is a row where the even-time is just a column value in this row, which then makes window-based aggregations become simply grouping on the event-time column. + +Unlike other systems where users have to specify how to aggregate the records when outputing, *Structured Streaming* would take care of updating the result table when there is new data, users can then just specify different modes to decide what gets written to the external storage. For example in Complete Mode, the entire updated result table would be written to external storage while in Update Mode, only the rows that were updated in the result table will be written out. - Naiad @@ -202,7 +222,15 @@ In conclusion, one of the most important core principles that drives *Google Dat Till now we have talked about what is stream processing and what are the different model/system built for this purpose. As shown before, the systems vary on how they view stream, for example *Storm* can perform operation on the level of each tuple while *Spark streaming* could group tuples into micro-batches and then process on the level of batch. They also differ on how to deal with failures, *Storm* can replay the tuple from spout while *Naiad* would keep checkpointing. Then we introduced *Google Dataflow*, which is seems the most powerful tool so far that allows the users to express how to group and control the tuples in the stream. -Despite all the differences among them, they all started with more or less the same goal: to be *the* stream processing system that would be used by companies, and we showed several examples of why companies might need such system. In this section, we would discuss two companies that use the stream processing system as the core of their business: Twitter and Spotify. +Despite all the differences among them, they all started with more or less the same goal: to be *the* stream processing system that would be used by companies, and we showed several examples of why companies might need such system. In this section, we would discuss three companies that use the stream processing system as the core of their business: Alibaba, Twitter and Spotify. + +## Alibaba +Alibaba is the largest e-commerce retailer in the world with an annual sales more than eBay and Amazon combined in 2015. Alibaba search is the its personalized search and recommendation platform which uses Apache Flink to power the critical aspects of it{% cite alibabaflink --file streaming %}. + +The processing engine of Alibaba runs on 2 different pipelines: a batch pipeline and a streaming pipeline, where the first one would process all data sources while the latter process updates that occur after the batch job is finished. As we can see the second pipeline is one example of stream processing. One of the example applications for the streaming pipeline is the online machine learning recommendation system. There are special days of the year (i.e., Singles Day like Black Friday in the U.S) where transaction volume is huge and the previously-trained model would not correctly reflect the current trends, thus Alibaba needs a streaming job to take the real-time data into account. There are many reasons that Alibaba chose Flink, for example, Flink is general enough to express both the batch pipeline and the streaming pipeline. Another reason is that the changes to the products must be reflected in the final search result thus at-least-once semantics is needed, while other products in Alibaba might need exactly-once semantics, and Flink provides both semantics. + +Alibaba developed a forked version of Flink called Blink to fit some of the unique requirements at Alibaba. One important improvement here is a more robust integration with YARN{% cite hadoopyarn --file streaming %}, where YARN is used as the global resource manager for Flink. YARN requires a job in Flink to grab all required resources up front and can not require or release resources dynamically. As Alibaba search engine is currently running on over 1000 machines, a better resourses utilization is critical. Blink improves on this by letting each job has its own JobMaster to request and release resources as the job requires, which optimizes the resources usage. + ## Twitter Twitter is one of the 'go-to' examples that people would think of when considering large scale stream processing system, since it has a huge amount of data that needed to be processed in real-time. Twitter bought the company that created *Storm* and used *Storm* as its real-time analysis tool for several years {% cite toshniwal2014storm --file streaming %}. However, as the data volume along with the more complex use cases increased, Twitter needed to build a new real-time stream data processing system as *Storm* can no longer satisfies the new requirements. We would talk about how *Storm* was used at Twitter and then the system that they built to replace *Storm*-*Heron*. |
