aboutsummaryrefslogtreecommitdiff
path: root/chapter
diff options
context:
space:
mode:
Diffstat (limited to 'chapter')
-rw-r--r--chapter/8/big-data.md39
1 files changed, 19 insertions, 20 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md
index 628dbaf..81b5d6f 100644
--- a/chapter/8/big-data.md
+++ b/chapter/8/big-data.md
@@ -24,19 +24,18 @@ Outline
## 1 Programming Models
### 1.1 Data parallelism
-*Data parallelism* is, given a dataset, the simultaneous execution on multiple machines or threads of the same function across groups of elements of a dataset. Data parallelism can also be thought of as a subset of SIMD ("single instruction, multiple data") execution, a class of parallel execution in Flynn's taxonomy. Comparably, a sequential computation looks like *"for all elements in the dataset, do operation A"*, where dataset could be in the order of terabytes or petabytes aka. big data and one wants to scale up the processing. The challenges to doing this sequential computation in a parallelized manner include how to abstract the different types of computations in a simple and correct way, how to distribute the data to hundreds/thousands of machines, how to handle failures and so on.
+*Data parallelism* is, given a dataset, the simultaneous execution on multiple machines or threads of the same function across groups of elements of a dataset. Data parallelism can also be thought of as a subset of SIMD ("single instruction, multiple data") execution, a class of parallel execution in Flynn's taxonomy. Comparably, one could think a sequential computation as *"for all elements in the dataset, do operation A"* on a single big dataset, whose size can reach to terabytes or petabytes. The challenges to doing this sequential computation in a parallelized manner include how to abstract the different types of computations in a simple and correct way, how to distribute the data to hundreds/thousands of machines or clusters, how to schedule tasks and handle failures and so on.
<figure class="main-container">
<img src="{{ site.baseurl }}/resources/img/data-parallelism.png" alt="Data Parallelism" />
</figure>
-**MapReduce** {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution.
+**MapReduce** {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution. The underlying execution systems can provide fault tolerance and scheduling.
-The MapReduce model is simple and powerful and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. In map reduce, programmers need to reason about data representation on disk or in storage services such as a database. Besides, developers need to clearly understand the map reduce execution model to do manual optimizations. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines by abstracting away the complexity involved in data representation and implicitly handling the optimizations. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly.
+The MapReduce model is simple and powerful and quickly becomes very popular among developers. However, when developers start writing real-world applications, they often end up writing many boilerplates and chaining together these stages. Moreover, The pipeline of MapReduce forces them to write additional coordinating codes, i.e., the development style goes backward from simple logic computation abstraction to lower-level coordination management. As we will discuss in *section 2 execution model*, MapReduce writes all data into disk after each stage, which causes severe delays. Programmers need to do manual optimizations for targeted performance, and this again requires them to understand the underlying execution model. The whole process soon becomes cumbersome. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines by abstracting away the complexity involved in data representation and implicitly handling the optimizations. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, thus there is no much need to write raw MR programs directly.
-An alternative approach to data parallelism is to construct complex, multi-step directed acyclic graphs (DAGs) of work from the user instructions and execute those DAGs all at once. This eliminates the costly synchronization required by MapReduce and makes applications much easier to build and reason about. Dryad, a Microsoft Research project used internally at Microsoft was one such project which leveraged this model of computation.
-Microsoft **Dryad** {% cite isard2007dryad --file big-data %} abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe, and shared-memory FIFO.
+After MapReduce, Microsoft proposed their data parallelism model: **Dryad** {% cite isard2007dryad --file big-data %}, which abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe, and shared-memory FIFO. The programming model is less elegant than MapReduce, programmers are not meant to interact with them directly. Instead, they are expected to use the high-level programming interfaces DryadLinq {% cite yu2008dryadlinq --file big-data %}, which more expressive and well embedded with .NET framework.
Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce's model and builds upon the ideas behind DAG, lazy evaluation of DryadLinq. Instead of writing data to disk for each job as MapReduce does Spark can cache the results across jobs. Spark explicitly caches computational data in memory through specialized immutable data structure named Resilient Distributed Sets(RDD) and reuse the same dataset across multiple parallel operations. The Spark builds upon RDD to achieve fault tolerance by reusing the lineage information of the lost RDD. This results in lesser overhead than what is seen in fault tolerance achieved by the checkpoint in Distributed Shared Memory systems. Moreover, Spark is the underlying framework upon which many very different systems are built, e.g., Spark SQL & DataFrames, GraphX, Streaming Spark, which makes it easy to mix and match the use of these systems all in the same application.These feature makes Spark the best fit for iterative jobs and interactive analytics and also helps it in providing better performance.
@@ -84,7 +83,7 @@ During executing, the MapReduce library assigns a master node to manage data par
MapReduce runs on hundreds or thousands of unreliable commodity machines, so the library must provide fault tolerance. The library assumes that master node would not fail, and it monitors worker failures. If no status update is received from a worker on timeout, the master will mark it as failed. Then the master may schedule the associated task to other workers depending on task type and status. The commits of *map* and *reduce* task outputs are atomic, where the in-progress task writes data into private temporary files, once the task succeeds, it negotiate with the master and rename files to complete the task. In the case of failure, the worker discards those temporary files. This guarantees that if the computation is deterministic, the distribution implementation should produce same outputs as non-faulting sequential execution.
*Limitations*
-Many analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth-first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources, and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations.
+Many analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth-first search require multiple stages of MapReduce jobs. In regular MapReduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources, and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on MapReduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations.
### 1.1.2 FlumeJava
@@ -224,7 +223,7 @@ RDDs by default are discarded after use. However, Spark provides two explicit op
*Why RDD over Distributed Shared memory (DSM) ?*
RDDs are immutable and can only be created through coarse grained transformation while DSM allows fine grained read and write operations to each memory location. Since RDDs are immutable they don't require checkpointing at all and can be derived from their lineages. Hence RDDs do not incur the overhead of checkpointing thats present in DSM.
Also, in DSM, any failure requires the whole program to be restored. In case of RDDs, only the lost RDD partitions need to be recovered. This recovery happens parallely on the affected nodes.
-RDDs are immutable and hence a straggler (slow node) can be replaced with a backup copy as in Map reduce. This is hard to implement in DSM as two copies point to the same location and can interfere in each other’s update.
+RDDs are immutable and hence a straggler (slow node) can be replaced with a backup copy as in MapReduce. This is hard to implement in DSM as two copies point to the same location and can interfere in each other’s update.
@@ -235,9 +234,9 @@ RDDs are immutable and hence a straggler (slow node) can be replaced with a back
- `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient.
### 1.2 Querying: declarative interfaces
-MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, map reduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in map-reduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown below, can span up to 63 lines.
+MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, MapReduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in map-reduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown below, can span up to 63 lines.
-*Complete code for Word count in Hadoop (Java based implementation of map reduce)*
+*Complete code for Word count in Hadoop (Java based implementation of MapReduce)*
```java
import java.io.IOException;
@@ -302,9 +301,9 @@ public class WordCount
*Why SQL over MapReduce ?*
-SQL already provides several operations like join, group by, sort which can be mapped to the above mentioned map reduce operations. Also, by leveraging SQL like interface, it becomes easy for non map reduce experts/non-programmers like data scientists to focus more on logic than hand coding complex operations {% cite scaling-spark-in-real-world --file big-data%}. Such an high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine.
+SQL already provides several operations like join, group by, sort which can be mapped to the above mentioned MapReduce operations. Also, by leveraging SQL like interface, it becomes easy for non MapReduce experts/non-programmers like data scientists to focus more on logic than hand coding complex operations {% cite scaling-spark-in-real-world --file big-data%}. Such an high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine.
SQL also lessens the amount of code (code examples can be seen in individual model’s section) and significantly reduces the development time.
-Most importantly, as you will read further in this section, frameworks like Pig, Hive, Spark SQL take advantage of these declarative queries by realizing them as a DAG upon which the compiler can apply transformation if an optimization rule is satisfied. Spark which does provide high level abstraction unlike map reduce, lacks this very optimization resulting in several human errors as discussed in the Spark’s data-parallel section.
+Most importantly, as you will read further in this section, frameworks like Pig, Hive, Spark SQL take advantage of these declarative queries by realizing them as a DAG upon which the compiler can apply transformation if an optimization rule is satisfied. Spark which does provide high level abstraction unlike MapReduce, lacks this very optimization resulting in several human errors as discussed in the Spark’s data-parallel section.
Sawzall {% cite pike2005interpreting --file big-data%} is a programming language built on top of MapReduce. It consists of a *filter* phase (map) and an *aggregation* phase (reduce). User program only need to specify the filter function, and emit the intermediate pairs to external pre-built aggregators. This largely eliminates the trouble for programmers put into having to write reducers, just the following example shows, programmers can use built-in reducer supports to do the a reducing job. The serialization of the data uses Google's *protocol buffers*, which can produce *meta-data* file for the declared scheme, but the scheme is not used for any optimization purpose per se. Sawzall is good for most of the straightforward processing on large dataset, but it does not support more complex and still common operations like *join*. The pre-built aggregators are limited and it is non-trivial to add more supports.
@@ -366,7 +365,7 @@ The following subsections will discuss Hive, Pig Latin, SparkSQL in details.
### 1.2.1 Hive/HiveQL
-Hive {% cite thusoo2010hive --file big-data%} is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It  supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs.
+Hive {% cite thusoo2010hive --file big-data%} is a data-warehousing infrastructure built on top of the MapReduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It  supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into MapReduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs.
Data in Hive is organized into three different formats:
@@ -468,13 +467,13 @@ Winding up - we can compare SQL vs Dataframe vs Dataset as below :
### 1.3 Large-scale parallelism on graphs
-Map Reduce doesn’t scale easily for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms require a programmer to explicitly handle the intermediate results (writing to disks) resulting in a lot of boilerplate code. Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system.
+MapReduce doesn’t scale easily for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms require a programmer to explicitly handle the intermediate results (writing to disks) resulting in a lot of boilerplate code. Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system.
-Also, graph algorithms require an exchange of messages between vertices. In a case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model.
+Also, graph algorithms require an exchange of messages between vertices. In a case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. MapReduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model.
In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages or shared state. Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates. The bulk-synchronous parallel (BSP) model {% cite valiant1990bridging --file big-data%} is one of the most commonly used graph-parallel model.
-BSP was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above-mentioned issues with map reduce
+BSP was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for MapReduce since it addressed the above-mentioned issues with MapReduce
BSP model is a message passing synchronous model where -
- Computation consists of several steps called as super steps.
@@ -482,7 +481,7 @@ BSP model is a message passing synchronous model where -
- At every super step, a processor receives input at the beginning, performs computation and outputs at the end.
- A processor at super step S can send a message to another processor at super step S+1 and can as well receive a message from super step S-1.
- Barrier synchronization syncs all the processors at the end of every super step.
-- A notable feature of the model is the complete control of data through communication between every processor at every super step. Though similar to map reduce model, BSP preserves data in memory across super steps and helps in reasoning iterative graph algorithms.
+- A notable feature of the model is the complete control of data through communication between every processor at every super step. Though similar to MapReduce model, BSP preserves data in memory across super steps and helps in reasoning iterative graph algorithms.
The graph-parallel abstractions allow users to succinctly describe graph algorithms, and provide a runtime engine to execute these algorithms in a distributed nature. They simplify the design, implementation, and application of sophisticated graph algorithms to large-scale real-world problems. Each of these frameworks presents a different view of graph computation, tailored to an originating domain or family of graph algorithms. However, these frameworks fail to address the problems of data preprocessing and construction, favor snapshot recovery over fault tolerance and lack support from distributed data flow frameworks. The data-parallel systems are well suited to the task of graph construction and are highly scalable. However, suffer from the very problems mentioned before for which the graph-parallel systems came into existence. GraphX {%cite xin2013graphx --file big-data%} is a new computation system which builds upon the Spark’s Resilient Distributed Dataset (RDD) to form a new abstraction Resilient Distributed Graph (RDG) to represent records and their relations as vertices and edges respectively. RDG’s leverage the RDD’s fault tolerance mechanism and expressivity.
@@ -503,7 +502,7 @@ GraphX API provides the below primitives for graph transformations (From the web
- `triplets RDD[EdgeTriplet[VD, ED]]` -returns collection of form ((i, j), (PV(i), PE(i, j), PV(j))). The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled.
- `leftJoin` - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure.
- `subgraph` - Applies predicates to return a subgraph of the original graph by filtering all the vertices and edges that don’t satisfy the vertices and edges predicates respectively.
-- `aggregateMessages (previously mapReduceTriplets) ` - It takes two functions, sendMsg and mergeMsg. The sendMsg function maps over every edge triplet in the graph while the mergeMsg acts like a reduce function in map reduce to aggregate those messages at their destination vertex. This is an important function which supports analytics tasks and iterative graph algorithms (eg., PageRank, Shortest Path) where individual vertices rely upon the aggregated properties of their neighbors.
+- `aggregateMessages (previously mapReduceTriplets) ` - It takes two functions, sendMsg and mergeMsg. The sendMsg function maps over every edge triplet in the graph while the mergeMsg acts like a reduce function in MapReduce to aggregate those messages at their destination vertex. This is an important function which supports analytics tasks and iterative graph algorithms (eg., PageRank, Shortest Path) where individual vertices rely upon the aggregated properties of their neighbors.
- `filterVertices(f: (Id, V)=>Bool): Graph[V, E]` - Filter the vertices by applying the predicate function f to return a new graph post filtering.
- `filterEdges(f: Edge[V, E]=>Bool): Graph[V, E]` - Filter the edges by applying the predicate function f to return a new graph post filtering.
@@ -583,7 +582,7 @@ Persistent RDDs are stored in memory as java objects (for performance) or in mem
The Hive execution model {% cite thusoo2010hive --file big-data%} composes of the below important components (and as shown in the below Hive architecutre diagram below):
-- Driver: Similar to the Drivers of Spark/Map reduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics.
+- Driver: Similar to the Drivers of Spark/MapReduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics.
- Metastore – A Hive meta store stores all information about the tables, their partitions, schemas, columns and their types, etc. enabling transparency of data format and its storage to the users. It, in turn, helps in data exploration, query compilation, and optimization. Criticality of the Matastore for managing the structure of Hadoop files requires it to be updated on a regular basis.
@@ -621,7 +620,7 @@ Some of the important optimization techniques in Hive are:
- Map Side Joins - Smaller tables in the join operation can be replicated in all the mappers and the reducers.
- Join Reordering - Reduce "reducer side" join operation memory by keeping only smaller tables in memory. Larger tables need not be kept in memory.
- Repartitioning data to handle skew in GROUP BY processing can be achieved by performing GROUP BY in two MapReduce stages. In first stage data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers.
- - Similar to combiners in Map reduce, hash based partial aggregations in the mappers can be performed to reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data.
+ - Similar to combiners in MapReduce, hash based partial aggregations in the mappers can be performed to reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data.
@@ -629,7 +628,7 @@ Some of the important optimization techniques in Hive are:
### 2.4 SparkSQL execution model
SparkSQL {% cite armbrust2015spark --file big-data%} execution model leverages Catalyst framework for optimizing the SQL before submitting it to the Spark Core engine for scheduling the job.
-A Catalyst is a query optimizer. Query optimizers for map reduce frameworks can greatly improve performance of the queries developers write and also significantly reduce the development time. A good query optimizer should be able to optimize user queries, extensible for user to provide information about the data and even dynamically include developer defined specific rules.
+A Catalyst is a query optimizer. Query optimizers for MapReduce frameworks can greatly improve performance of the queries developers write and also significantly reduce the development time. A good query optimizer should be able to optimize user queries, extensible for user to provide information about the data and even dynamically include developer defined specific rules.
Catalyst leverages the Scala’s functional language features like pattern matching and runtime meta programming to allow developers to concisely specify complex relational optimizations.