diff options
| author | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-07 16:27:34 -0500 |
|---|---|---|
| committer | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-07 16:27:34 -0500 |
| commit | 8aa4751413dc4c8e8e83edf0087a604ffbc0ecb4 (patch) | |
| tree | 1696916186ca01e7bc8e4a9587e58af4fdf991d3 | |
| parent | e4a447b9a7c357696d6a1bd5e71d99043b3c57ab (diff) | |
add mr exec + re-order
| -rw-r--r-- | chapter/8/big-data.md | 318 |
1 files changed, 159 insertions, 159 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index a75f9fc..5bafc4a 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -12,11 +12,7 @@ latency numbers that every programmer should know - 1.1.2 FlumeJava - 1.1.3 Dryad - 1.1.4 Spark - - 1.2. Large-scale Parallelism on Graphs - - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly. - - GraphX programming model - discuss disadvantages graph-parallel model to data parallel model for large scale graph processing? how graphX combines the advantages of both the models? representation of a graph in GraphX? discuss the model, vertex cut partitioning and its importance? graph operations ? - - - 1.3. Querying: we need more declarative interfaces, built on top MR models. + - 1.2. Querying: we need more declarative interfaces, built on top MR models. - Sawzall {%cite pike2005interpreting --file big-data %}: first one propose - Pig {% cite olston2008pig --file big-data %}: on top of Hadoop, independent of execution platform, in theory can compiled into DryadLINQ too; what is the performance gain/lost? Easier to debug? - Hive {%cite thusoo2009hive --file big-data %} @@ -24,20 +20,17 @@ latency numbers that every programmer should know `Suggestion: Merge this with Dryad above?` - Dremel, query natively w/o translating into MR jobs - Spark SQL {%cite --file big-data %} - Limitations of Relational alone models? how SparkSQL model overcomes it? goals of SparkSQL? how it leverages the Spark programming model? what is a DataFrame and how is it different from a RDD? what are the operations a DataFrame provides? how is in-memory caching different from Spark? - -- 2. Execution Models + - 1.3. Large-scale Parallelism on Graphs + - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly. + - GraphX programming model - discuss disadvantages graph-parallel model to data parallel model for large scale graph processing? how graphX combines the advantages of both the models? representation of a graph in GraphX? discuss the model, vertex cut partitioning and its importance? graph operations ? +- \2. Execution Models - 2.1 MapReduce (intermediate writes to disk): What is the sequence of actions when a MapReduce functions are called? How is write-to-disk good/bad (fault-tolerant/slow)? How does the data are transmitted across clusters efficiently (store locally)? To shorten the total time for MR operations, it uses backup tasks. When MR jobs are pipelined, what optimizations can be performed by FlumeJava? In spite of optimizations and pipelining, what is the inherent limitation (not support iterative algorithm?) - 2.2 Spark (all in memory): introduce spark architecture, different layers, what happens when a spark job is executed? what is the role of a driver/master/worker, how does a scheduler schedule the tasks and what performance measures are considered while scheduling? how does a scheduler manage node failures and missing partitions? how are the user defined transformations passed to the workers? how are the RDDs stored and memory management measures on workers? do we need checkpointing at all given RDDs leverage lineage for recovery? if so why ? - 2.3 Graphs : - Pregel :Overview of Pregel. Its implementation and working. its limitations. Do not stress more since we have a better model GraphX to explain a lot. - GraphX : Working on this. - SparkSQL Catalyst & Spark execution model : Discuss Parser, LogicalPlan, Optimizer, PhysicalPlan, Execution Plan. Why catalyst? how catalyst helps in SparkSQL , data flow from sql-core-> catalyst->spark-core - -- Evaluation: Given same algorithm, what is the performance differences between Hadoop, Spark, Dryad? There are no direct comparison for all those models, so we may want to compare separately: - - Hadoop vs. Spark - - Spark vs. SparkSQL from SparkSQL paper - -- Big Data Ecosystem +- \3. Big Data Ecosystem Everything interoperates with GFS or HDFS, or makes use of stuff like protocol buffers so systems like Pregel and MapReduce and even MillWheel... - GFS/HDFS for MapReduce/Hadoop: Machines are unreliable, how do they provide fault-tolerance? How does GFS deal with single point of failure (shadow masters)? How does the master manage partition, transmission of data chunks? Which - Resource Management: Mesos. New frameworks keep emerging and users have to use multiple different frameworks(MR, Spark etc.) in the same clusters, so how should they share access to the large datasets instead of costly replicate across clusters? @@ -51,14 +44,14 @@ latency numbers that every programmer should know <img src="{{ site.baseurl }}/resources/img/data-parallelism.png" alt="Data Parallelism" /> </figure> -*MapReduce* {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution. +**MapReduce** {% cite dean2008mapreduce --file big-data %} is a programming model proposed by Google to initially satisfy their demand of large-scale indexing for web search service. It provides a simple user program interface: *map* and *reduce* functions and automatically handles the parallelization and distribution. -The MapReduce model is simple and powerful, and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. Besides, Developers mostly need to understand the execution model to do manual optimizations. *FlumeJava* library intends to provide support for developing data-parallel pipelines. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly. +The MapReduce model is simple and powerful, and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. Besides, Developers mostly need to understand the execution model to do manual optimizations. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly. -Microsfot *Dryad* {% cite isard2007dryad --file big-data %} designed differently from MapReduce and can support more general computations. It abstracts individual computation tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine to construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports to a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also allows memory +Microsfot **Dryad** {% cite isard2007dryad --file big-data %} designed differently from MapReduce and can support more general computations. It abstracts individual computation tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine to construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports to a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also allows memory -Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. *Spark* {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce, however, instead of writing data to disk for each job as MapReduce does, user program in Spark can explicitly cache an RDD in memory and reuse the same dataset across multiple parallel operations. This feature makes Spark suitable for iterative jobs and interactive analytics. +Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce, however, instead of writing data to disk for each job as MapReduce does, user program in Spark can explicitly cache an RDD in memory and reuse the same dataset across multiple parallel operations. This feature makes Spark suitable for iterative jobs and interactive analytics. Details about the programming models of MapReduce, FlumeJava, Dryad and Spark are discussed in following four sections. @@ -137,7 +130,7 @@ PCollection<String> words = ``` *Optimizer* -`(JJ: placehoder) parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` +`parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` ### 1.1.3 Dryad @@ -147,7 +140,7 @@ Dryad is a more general and flexible execution engine that execute subroutines a Spark is a fast, in-memory data processing engine with an elegant and expressive development interface which enables developers to efficiently execute machine learning, SQL or streaming workloads that require fast iterative access to datasets. Its a functional style programming model (similar to DryadLINQ) where a developer can create acyclic data flow graphs and transform a set of input data through a map - reduce like operators. Spark provides two main abstractions - distributed in-memory storage (RDD) and parallel operations (based on Scala’s collection API) on data sets high performance processing, scalability and fault tolerance. -***Distributed in-memory storage - Resilient Distributed Data sets :*** +*Distributed in-memory storage - Resilient Distributed Data sets :* RDD is a partitioned, read only collection of objects which can be created from data in stable storage or by transforming other RDD. It can be distributed across multiple nodes (parallelize) in a cluster and is fault tolerant(Resilient). If a node fails, a RDD can always be recovered using its lineage graph (information on how it was derived from dataset). A RDD is stored in memory (as much as it can fit and rest is spilled to disk) and is immutable - It can only be transformed to a new RDD. These are the lazy transformations which are applied only if any action is performed on the RDD. Hence, RDD need not be materialized at all times. @@ -183,7 +176,7 @@ Spark API provide two kinds of operations on a RDD: RDDs by default are discarded after use. However, Spark provides two explicit operations persist() and cache() to ensure RDDs are persisted in memory once the RDD has been computed for the first time. -***Why RDD over Distributed Shared memory (DSM) ?*** +*Why RDD over Distributed Shared memory (DSM) ?* RDDs are immutable and can only be created through coarse grained transformation while DSM allows fine grained read and write operations to each memory location. Hence RDDs do not incur the overhead of checkpointing thats present in DSM and can be recovered using their lineages. RDDs are immutable and hence a straggler(slow node) can be replaced with backup copy as in Map reduce. This is hard to implement in DSM as two copies point to the same location and can interfere in each other’s update. Other benefits include the scheduling of tasks based on data locality to improve performance and the ability of the RDDs to degrade gracefully incase of memory shortage. Partitions that do not fit in RAM gets spilled to the disk (performance will then be equal to that of any data parallel system). @@ -195,69 +188,15 @@ Other benefits include the scheduling of tasks based on data locality to improve - `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient. -### 1.2 Large-scale Parallelism on Graphs -Map Reduce doesn’t scale easily and is highly inefficient for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms requires programmer to explicitly handle the intermediate results (writing to disks). Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system. - -Also graph algorithms require exchange of messages between vertices. In case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model. - -In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages or shared state. Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates. Most systems adopt the bulk synchronous parallel model - -This model was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above mentioned issues with map reduce<br /> -BSP model is a message passing synchronous model where - - - - Computation consists of several steps called as supersets. - - The processors involved have their own local memory and every processor is connected to other via a point-to-point communication. - - At every superstep, a processor receives input at the beginning, performs computation and outputs at the end. - - A processor at superstep S can send message to another processor at superstep S+1 and can as well receive message from superstep S-1. - - Barrier synchronization synchs all the processors at the end of every superstep. - -A notable feature of the model is the complete control on data through communication between every processor at every superstep. Though similar to map reduce model, BSP preserves data in memory across supersteps and helps in reasoning iterative graph algorithms. - -The graph-parallel abstractions allow users to succinctly describe graph algorithms, and provide a runtime engine to execute these algorithms in a distributed nature. They simplify the design, implementation, and application of sophisticated graph algorithms to large-scale real-world problems. Each of these frameworks presents a different view of graph computation, tailored to an originating domain or family of graph algorithms. However, these frameworks fail to address the problems of data preprocessing and construction, favor snapshot recovery over fault tolerance and lack support from distributed data flow frameworks. The data-parallel systems are well suited to the task of graph construction, and are highly scalable. However, suffer from the very problems mentioned before for which the graph-parallel systems came into existence. -GraphX is a new computation system which builds upon the Spark’s Resilient Distributed Dataset (RDD) to form a new abstraction Resilient Distributed Graph (RDG) to represent records and their relations as vertices and edges respectively. RDG’s leverage the RDD’s fault tolerance mechanism and expressivity. - -How does GraphX improve over the existing graph-parallel and data flow models ? -The RDGs in GraphX provides a set of elegant and expressive computational primitives through which many a graph parallel systems like Pregel, PowerGraph can be easily expressed with minimal lines of code. GraphX simplifies the process of graph ETL and analysis through new operations like filter, view and graph transformations. It minimizes communication and storage overhead. - -Similar to the data flow model, it GraphX away from the vertex centric view and adopts transformations on graphs yielding a new graph. - -***Why partitioning is important in graph computation systems ?*** -Graph-parallel computation requires every vertex or edge to be processed in the context of its neighborhood. Each transformation depends on the result of distributed joins between vertices and edges. This means that graph computation systems rely on graph partitioning (edge-cuts in most of the systems) and efficient storage to minimize communication and storage overhead and ensure balanced computation. - -<figure class="main-container"> - <img src="./edge-cuts.png" alt="edge cuts" /> -</figure> - -***Why Edge-cuts are expensive ?*** -Edge-cuts for partitioning requires random assignment of vertices and edges across all the machines. hus the communication and storage overhead is proportional to the number of edges cut, and this makes balancing the number of cuts a priority. For most real-world graphs, constructing an optimal edge-cut is cost prohibitive, and most systems use random edge-cuts which achieve appropriate work balance, but nearly worst-case communication overhead. - -<figure class="main-container"> - <img src="./vertex-cuts.png" alt="Vertex cuts" /> -</figure> - -***Vertex-cuts - GraphX’s solution to effective partitioning*** : An alternative approach which does the opposite of edge-cut — evenly assign edges to machines, but allow vertices to span multiple machines. The communication and storage overhead of a vertex-cut is directly proportional to the sum of the number of machines spanned by each vertex. Therefore, we can reduce communication overhead and ensure balanced computation by evenly assigning edges to machines in way that minimizes the number of machines spanned by each vertex. -The GraphX RDG structure implements a vertex-cut representation of a graph using three unordered horizontally partitioned RDD tables. These three tables are gone into in more detail in the paper, but the general purposes are as follows: +### 1.2 Querying: declarative interfaces -- `EdgeTable(pid, src, dst, data)`: Stores adjacency structure and edge data. -- `VertexDataTable(id, data)`: Stores vertex data. Contains states associated with vertices that are changing in the course of graph computation -- `VertexMap(id, pid)`: Maps from vertex ids to the partitions that contain their adjacent edges. Remains static as long as the graph structure doesn’t change. - -A three-way relational join is used to bring together source vertex data, edge data, and target vertex data. The join is straightforward, and takes advantage of a partitioner to ensure the join site is local to the edge table. This means GraphX only has to shuffle vertex data. - -***Operators in GraphX*** -Other than standard data-parallel operators like filter, map, leftJoin, and reduceByKey, GraphX supports following graph-parallel operators: - -- graph - constructs property graph given a collection of edges and vertices. -- vertices, edges - decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs. -- mapV, mapE - transform the vertex or edge collection. -- triplets -returns collection of form ((i, j), (PV(i), PE(i, j), PV(j))). The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled. -- leftJoin - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure. -- subgraph - returns a subgraph of the original graph by applying predicates on edges and vertices -- mrTriplets (MapReduce triplet) - logical composition of triplets followed by map and reduceByKey. It is the building block of graph-parallel algorithms. +Map reduce provides only two high level primitives - map and reduce; that the programmers have to worry about. Map reduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework still suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. +Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Also, these kind of abstractions provide ample opportunities for query optimizations. +**Introduce Sazwal** (its now no more used but one of the first ideas) : Parallel analysis with Sawzall. Scientific Programming, 13(4):277–298, 2005 -### 1.3 Querying +### 1.2.x Hive/HiveQL Hive is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. @@ -271,7 +210,7 @@ Data in Hive is organized into three different formats : ***HiveSQL*** : Hive query language (HiveQL) consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this: -FROM ( +FROM ( MAP inputdata USING 'python mapper.py' AS (word, count) FROM inputtable CLUSTER BY word @@ -282,9 +221,9 @@ INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handl ***Serialization/Deserialization*** -Hive implements the LazySerDe as the default SerDe. It deserializes rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row. Hive also supports various formats like TextInputFormat, SequenceFileInputFormat and RCFileInputFormat. +Hive implements the LazySerDe as the default SerDe. It deserializes rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row. Hive also supports various formats like TextInputFormat, SequenceFileInputFormat and RCFileInputFormat. -#### SparkSQL - Where Relational meets Procedural : +### 1.3.x SparkSQL - Where Relational meets Procedural : Relational interface to big data is good, however, it doesn’t cater to users who want to perform - ETL to and from various semi or unstructured data sources. @@ -320,70 +259,89 @@ Winding up - we can compare SQL vs Dataframe vs Dataset as below : <img src="./sql-vs-dataframes-vs-datasets.png" alt="SQL vs Dataframe vs Dataset" /> </figure> +### 1.3 Large-scale Parallelism on Graphs +Map Reduce doesn’t scale easily and is highly inefficient for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms requires programmer to explicitly handle the intermediate results (writing to disks). Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system. -## 2 Execution Models -**2.1 MapReduce**, -as mentioned in the programming model section, the execution model is interesting that all the intermediate key/value pairs are written to and read from disk. The output from distributed computation should be same as one from non-faulting sequential execution of the entire program. And the model relies on the atomic commits of map and reduce task outputs to achieve it. The basic idea is to create private temporary files and rename them only when the task has finished. This makes fault-tolerance easy, one could simple start another one if the worker failed. But this is also the bottleneck to run multiple stages. +Also graph algorithms require exchange of messages between vertices. In case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms. One model that is commonly employed for implementing distributed graph processing is the graph parallel model. -*Execution* `TODO: move this to execution and talk about fault-tolerance instead` -At high level, when the user program calls *MapReduce* function, the input files are split into *M* pieces and it runs *map* function on corresponding splits; then intermediate key space are partitioned into *R* pieces using a partitioning function; After the reduce functions all successfully complete, the output is available in *R* files. The sequences of actions are shown in the figure below. We can see from label (4) and (5) that the intermediate key/value pairs are written/read into disks, this is a key to fault-tolerance in MapReduce model and also a bottleneck for more complex computation algorithms. +In the graph-parallel abstraction, a user-defined vertex program is instantiated concurrently for each vertex and interacts with adjacent vertex programs through messages or shared state. Each vertex program can read and modify its vertex property and in some cases adjacent vertex properties. When all vertex programs vote to halt the program terminates. Most systems adopt the bulk synchronous parallel model -<figure class="main-container"> - <img src="{{ site.baseurl }}/resources/img/mapreduce-execution.png" alt="MapReduce Execution Overview" /> -</figure> +This model was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above mentioned issues with map reduce<br /> +BSP model is a message passing synchronous model where - + - Computation consists of several steps called as supersets. + - The processors involved have their own local memory and every processor is connected to other via a point-to-point communication. + - At every superstep, a processor receives input at the beginning, performs computation and outputs at the end. + - A processor at superstep S can send message to another processor at superstep S+1 and can as well receive message from superstep S-1. + - Barrier synchronization synchs all the processors at the end of every superstep. + +A notable feature of the model is the complete control on data through communication between every processor at every superstep. Though similar to map reduce model, BSP preserves data in memory across supersteps and helps in reasoning iterative graph algorithms. + +The graph-parallel abstractions allow users to succinctly describe graph algorithms, and provide a runtime engine to execute these algorithms in a distributed nature. They simplify the design, implementation, and application of sophisticated graph algorithms to large-scale real-world problems. Each of these frameworks presents a different view of graph computation, tailored to an originating domain or family of graph algorithms. However, these frameworks fail to address the problems of data preprocessing and construction, favor snapshot recovery over fault tolerance and lack support from distributed data flow frameworks. The data-parallel systems are well suited to the task of graph construction, and are highly scalable. However, suffer from the very problems mentioned before for which the graph-parallel systems came into existence. +GraphX is a new computation system which builds upon the Spark’s Resilient Distributed Dataset (RDD) to form a new abstraction Resilient Distributed Graph (RDG) to represent records and their relations as vertices and edges respectively. RDG’s leverage the RDD’s fault tolerance mechanism and expressivity. + +How does GraphX improve over the existing graph-parallel and data flow models ? +The RDGs in GraphX provides a set of elegant and expressive computational primitives through which many a graph parallel systems like Pregel, PowerGraph can be easily expressed with minimal lines of code. GraphX simplifies the process of graph ETL and analysis through new operations like filter, view and graph transformations. It minimizes communication and storage overhead. -**Hive execution model** +Similar to the data flow model, it GraphX away from the vertex centric view and adopts transformations on graphs yielding a new graph. +***Why partitioning is important in graph computation systems ?*** +Graph-parallel computation requires every vertex or edge to be processed in the context of its neighborhood. Each transformation depends on the result of distributed joins between vertices and edges. This means that graph computation systems rely on graph partitioning (edge-cuts in most of the systems) and efficient storage to minimize communication and storage overhead and ensure balanced computation. <figure class="main-container"> - <img src="./Hive-architecture.png" alt="Hive architecture" /> + <img src="./edge-cuts.png" alt="edge cuts" /> </figure> -The query is submitted via CLI/web UI/any other interface. This query goes to the compiler and undergoes parse, type-check and semantic analysis phases using the metadata from Metastore. The compiler generates a logical plan which is optimized by the rule-based optimizer and an optimized plan in the form of DAG of MapReduce and hdfs tasks is generated. The execution engine executes these tasks in the correct order using Hadoop. - -***Metastore*** -It stores all information about the tables, their partitions, schemas, columns and their types, etc. Metastore runs on traditional RDBMS (so that latency for metadata query is very small) and uses an open source ORM layer called DataNuclues. Matastore is backed up regularly. To make sure that the system scales with the number of queries, no metadata queries are made the mapper/reducer of a job. Any metadata needed by the mapper or the reducer is passed through XML plan files that are generated by the compiler. +***Why Edge-cuts are expensive ?*** +Edge-cuts for partitioning requires random assignment of vertices and edges across all the machines. hus the communication and storage overhead is proportional to the number of edges cut, and this makes balancing the number of cuts a priority. For most real-world graphs, constructing an optimal edge-cut is cost prohibitive, and most systems use random edge-cuts which achieve appropriate work balance, but nearly worst-case communication overhead. -***Query Compiler*** -Hive Query Compiler works similar to traditional database compilers. Antlr is used to generate the Abstract Syntax Tree (AST) of the query. A logical plan is created using information from the metastore. An intermediate representation called query block (QB) tree is used when transforming AST to operator DAG. Nested queries define the parent-child relationship in QB tree. -Optimization logic consists of a chain of transformation operations such that output from one operation is input to next operation. Each transformation comprises of a walk on operator DAG. Each visited node in the DAG is tested for different rules. If any rule is satisfied, its corresponding processor is invoked. Dispatcher maintains a mapping for different rules and their processors and does rule matching. GraphWalker manages the overall traversal process. Logical plan generated in the previous step is split into multiple MapReduce and hdfs tasks. Nodes in the plan correspond to physical operators and edges represent the flow of data between operators. +<figure class="main-container"> + <img src="./vertex-cuts.png" alt="Vertex cuts" /> +</figure> -***Optimisations of Hive:*** +***Vertex-cuts - GraphX’s solution to effective partitioning*** : An alternative approach which does the opposite of edge-cut — evenly assign edges to machines, but allow vertices to span multiple machines. The communication and storage overhead of a vertex-cut is directly proportional to the sum of the number of machines spanned by each vertex. Therefore, we can reduce communication overhead and ensure balanced computation by evenly assigning edges to machines in way that minimizes the number of machines spanned by each vertex. -- Column Pruning - Only the columns needed in the query processing are projected. -- Predicate Pushdown - Predicates are pushed down to the scan so that rows are filtered as early as possible. -- Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate. -- Map Side Joins - In case the tables involved in the join are very small, the tables are replicated in all the mappers and the reducers. -- Join Reordering - Large tables are streamed and not materialized in-memory in the reducer to reduce memory requirements.Some optimizations are not enabled by default but can be activated by setting certain flags. These include: -- Repartitioning data to handle skew in GROUP BY processing.This is achieved by performing GROUP BY in two MapReduce stages - first where data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers. -- Hash bases partial aggregations in the mappers to reduce the data that is sent by the mappers to the reducers which help in reducing the amount of time spent in sorting and merging the resulting data. +The GraphX RDG structure implements a vertex-cut representation of a graph using three unordered horizontally partitioned RDD tables. These three tables are gone into in more detail in the paper, but the general purposes are as follows: -***Execution Engine*** +- `EdgeTable(pid, src, dst, data)`: Stores adjacency structure and edge data. +- `VertexDataTable(id, data)`: Stores vertex data. Contains states associated with vertices that are changing in the course of graph computation +- `VertexMap(id, pid)`: Maps from vertex ids to the partitions that contain their adjacent edges. Remains static as long as the graph structure doesn’t change. -Execution Engine executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query). +A three-way relational join is used to bring together source vertex data, edge data, and target vertex data. The join is straightforward, and takes advantage of a partitioner to ensure the join site is local to the edge table. This means GraphX only has to shuffle vertex data. -**Pregel** +***Operators in GraphX*** +Other than standard data-parallel operators like filter, map, leftJoin, and reduceByKey, GraphX supports following graph-parallel operators: -Pregel is an implementation of classic BSP model by Google (PageRank) to analyze large graphs exclusively. It was followed by open source implementations - Apache’s Giraph and Hama; which were BSP models built on top of Hadoop. +- graph - constructs property graph given a collection of edges and vertices. +- vertices, edges - decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs. +- mapV, mapE - transform the vertex or edge collection. +- triplets -returns collection of form ((i, j), (PV(i), PE(i, j), PV(j))). The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled. +- leftJoin - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure. +- subgraph - returns a subgraph of the original graph by applying predicates on edges and vertices +- mrTriplets (MapReduce triplet) - logical composition of triplets followed by map and reduceByKey. It is the building block of graph-parallel algorithms. -Pregel is highly scalable, fault-tolerant and can successfully represent larger complex graphs. Google claims the API becomes easy once a developer adopts “think like a vertex” mode. -Pregel’s computation system is iterative and every iteration is called as superstep. The system takes a directed graph as input with properties assigned to both vertices and graph. At each superstep, all vertices executes in parallel, a user-defined function which represents the behavior of the vertex. The function has access to message sent to its vertex from the previous superstep S-1 and can update the state of the vertex, its edges, the graph and even send messages to other vertices which would receive in the next superstep S+1. The synchronization happens only between two supersteps. Every vertex is either active or inactive at any superstep. The iteration stops when all the vertices are inactive. A vertex can deactivate itself by voting for it and gets active if it receives a message. This asynchronous message passing feature eliminates the shared memory, remote reads and latency of Map reduce model. +## 2 Execution Models +There are many possible implementations for those programming models. In this section, we will discuss about a few different execution models, how the above programming interfaces exploit them, the benefits and limitations of each design and so on. -Pregel’s API provides +### 2.1 Basic MapReduce Execution +The original MapReduce model is implemented and deployed in Google infrastructure. As described in section 1.1.1, user program defines map and reduce functions and the underlying system manages data partition and schedules jobs across different nodes. Figure 2.1.1 shows the overall flow when the user program calls MapReduce function: +1. Split data +2. Copy process +3. Map and buffer +4. Write to local and log location +5. shuffle +6. reduce +7. master wake up -- compute() method for the user to implement the logic to change the state of the graph/vertex at every superstep. It guarantees message delivery through an iterator at every superstep. -- User defined handler for handling issues like missing destination vertex etc. -- Combiners reduce the amount of messages passed from multiple vertices to the same destination vertex. -- Aggregators capture the global state of the graph. A reduce operation combines the value given by every vertex to the aggregator. The combined/aggregated value is passed onto to all the vertices in the next superstep. -- Fault tolerance is achieved through checkpointing and instructing the workers to save the state of nodes to a persistent storage. When a machine fails, all workers restart the execution with state of their recent checkpoint. -- Master and worker implementation : The master partitions graph into set of vertices (hash on vertex ID mod number of partitions) and outgoing edges per partition. Each partition is assigned to a worker who manages the state of all its vertices by executing compute() method and coordinating the message communication. The workers also notifies the master of the vertices that are active for the next superstep. +//At high level, when the user program calls *MapReduce* function, the input files are split into *M* pieces and it runs *map* function on corresponding splits; then intermediate key space are partitioned into *R* pieces using a partitioning function; After the reduce functions all successfully complete, the output is available in *R* files. The sequences of actions are shown in the figure below. We can see from label (4) and (5) that the intermediate key/value pairs are written/read into disks, this is a key to fault-tolerance in MapReduce model and also a bottleneck for more complex computation algorithms. -Pregel works good for sparse graphs. However, dense graph could cause communication overhead resulting in system to break. Also, the entire computation state resides in the main memory and hence constrained by the size of main memory. +<figure class="main-container"> + <img src="{{ site.baseurl }}/resources/img/mapreduce-execution.png" alt="MapReduce Execution Overview" /> -Apache Giraph is an open source implementation of Pregel in which new features like master computation, sharded aggregators, edge-oriented input, out-of-core computation are added making it more efficient. The most high performance graph processing framework is GraphLab which is developed at Carnegie Melon University and uses the BSP model and executes on MPI. +</figure> +<p>Figure 2.1.1 Execution overview<label for="sn-proprietary-monotype-bembo" class="margin-toggle sidenote-number"></label><input type="checkbox" id="sn-proprietary-monotype-bembo" class="margin-toggle"/><span class="sidenote">See Tufte’s comment in the <a href="http://www.edwardtufte.com/bboard/q-and-a-fetch-msg?msg_id=0000Vt">Tufte book fonts</a> thread.</span></p> -**Spark execution model** +### 2.2 Spark execution model <figure class="main-container"> <img src="./cluster-overview.png" alt="MapReduce Execution Overview" /> @@ -400,6 +358,37 @@ A Spark worker executes the business logic submitted by the Spark driver. Spark Persistent RDDs are stored in memory as java objects (for performance) or in memory as serialized data (for less memory usage at cost of performance) or on disk. If the worker runs out of memory upon creation of a new RDD, LRU policy is applied to evict the least recently accessed RDD unless its same as the new RDD. In that case, the old RDD is excluded from eviction given the fact that it may be reused again in future. Long lineage chains involving wide dependencies are checkpointed to reduce the time in recovering a RDD. However, since RDDs are read-only, checkpointing is still ok since consistency is not a concern and there is no overhead to manage the consistency as is seen in distributed shared memory. +### 2.3 Hive execution model + + +<figure class="main-container"> + <img src="./Hive-architecture.png" alt="Hive architecture" /> +</figure> + +The query is submitted via CLI/web UI/any other interface. This query goes to the compiler and undergoes parse, type-check and semantic analysis phases using the metadata from Metastore. The compiler generates a logical plan which is optimized by the rule-based optimizer and an optimized plan in the form of DAG of MapReduce and hdfs tasks is generated. The execution engine executes these tasks in the correct order using Hadoop. + +***Metastore*** +It stores all information about the tables, their partitions, schemas, columns and their types, etc. Metastore runs on traditional RDBMS (so that latency for metadata query is very small) and uses an open source ORM layer called DataNuclues. Matastore is backed up regularly. To make sure that the system scales with the number of queries, no metadata queries are made the mapper/reducer of a job. Any metadata needed by the mapper or the reducer is passed through XML plan files that are generated by the compiler. + +***Query Compiler*** +Hive Query Compiler works similar to traditional database compilers. Antlr is used to generate the Abstract Syntax Tree (AST) of the query. A logical plan is created using information from the metastore. An intermediate representation called query block (QB) tree is used when transforming AST to operator DAG. Nested queries define the parent-child relationship in QB tree. +Optimization logic consists of a chain of transformation operations such that output from one operation is input to next operation. Each transformation comprises of a walk on operator DAG. Each visited node in the DAG is tested for different rules. If any rule is satisfied, its corresponding processor is invoked. Dispatcher maintains a mapping for different rules and their processors and does rule matching. GraphWalker manages the overall traversal process. Logical plan generated in the previous step is split into multiple MapReduce and hdfs tasks. Nodes in the plan correspond to physical operators and edges represent the flow of data between operators. + +***Optimisations of Hive:*** + +- Column Pruning - Only the columns needed in the query processing are projected. +- Predicate Pushdown - Predicates are pushed down to the scan so that rows are filtered as early as possible. +- Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate. +- Map Side Joins - In case the tables involved in the join are very small, the tables are replicated in all the mappers and the reducers. +- Join Reordering - Large tables are streamed and not materialized in-memory in the reducer to reduce memory requirements.Some optimizations are not enabled by default but can be activated by setting certain flags. These include: +- Repartitioning data to handle skew in GROUP BY processing.This is achieved by performing GROUP BY in two MapReduce stages - first where data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers. +- Hash bases partial aggregations in the mappers to reduce the data that is sent by the mappers to the reducers which help in reducing the amount of time spent in sorting and merging the resulting data. + +***Execution Engine*** + +Execution Engine executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query). + + **SparkSQL execution model** SparkSQL execution model leverages Catalyst framework for optimizing the SQL before submitting it to the Spark Core engine for scheduling the job. @@ -439,61 +428,72 @@ Overall, the performance is very good for conceptually unrelated computations. {% bibliography --file big-data %} - - - - - - - - - ## Trash +### Pregel Execution (suggestion: delete) -## Iterative processing in Map Reduce - -Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop, Twister and iMapReduce adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations. +Pregel is an implementation of classic BSP model by Google (PageRank) to analyze large graphs exclusively. It was followed by open source implementations - Apache’s Giraph and Hama; which were BSP models built on top of Hadoop. +Pregel is highly scalable, fault-tolerant and can successfully represent larger complex graphs. Google claims the API becomes easy once a developer adopts “think like a vertex” mode. +Pregel’s computation system is iterative and every iteration is called as superstep. The system takes a directed graph as input with properties assigned to both vertices and graph. At each superstep, all vertices executes in parallel, a user-defined function which represents the behavior of the vertex. The function has access to message sent to its vertex from the previous superstep S-1 and can update the state of the vertex, its edges, the graph and even send messages to other vertices which would receive in the next superstep S+1. The synchronization happens only between two supersteps. Every vertex is either active or inactive at any superstep. The iteration stops when all the vertices are inactive. A vertex can deactivate itself by voting for it and gets active if it receives a message. This asynchronous message passing feature eliminates the shared memory, remote reads and latency of Map reduce model. +Pregel’s API provides +- compute() method for the user to implement the logic to change the state of the graph/vertex at every superstep. It guarantees message delivery through an iterator at every superstep. +- User defined handler for handling issues like missing destination vertex etc. +- Combiners reduce the amount of messages passed from multiple vertices to the same destination vertex. +- Aggregators capture the global state of the graph. A reduce operation combines the value given by every vertex to the aggregator. The combined/aggregated value is passed onto to all the vertices in the next superstep. +- Fault tolerance is achieved through checkpointing and instructing the workers to save the state of nodes to a persistent storage. When a machine fails, all workers restart the execution with state of their recent checkpoint. +- Master and worker implementation : The master partitions graph into set of vertices (hash on vertex ID mod number of partitions) and outgoing edges per partition. Each partition is assigned to a worker who manages the state of all its vertices by executing compute() method and coordinating the message communication. The workers also notifies the master of the vertices that are active for the next superstep. -**Haloop** : {% cite bu2010haloop --file big-data %} +Pregel works good for sparse graphs. However, dense graph could cause communication overhead resulting in system to break. Also, the entire computation state resides in the main memory and hence constrained by the size of main memory. -**iMapReduce**: {% cite zhang2012imapreduce --file big-data %} +Apache Giraph is an open source implementation of Pregel in which new features like master computation, sharded aggregators, edge-oriented input, out-of-core computation are added making it more efficient. The most high performance graph processing framework is GraphLab which is developed at Carnegie Melon University and uses the BSP model and executes on MPI. -**Twister** : {% cite ekanayake2010twister --file big-data %} -## Map Reduce inspired large scale data processing systems -**Dryad/DryadLinq** : -**Spark (big one)** : -## Declarative interfaces for the Map Reduce framework -Map reduce provides only two high level primitives - map and reduce; that the programmers have to worry about. Map reduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework still suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. -Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Also, these kind of abstractions provide ample opportunities for query optimizations. -**Introduce Sazwal** (its now no more used but one of the first ideas) : Parallel analysis with Sawzall. Scientific Programming, 13(4):277–298, 2005 +//[`COMMENT: move this to introducing DryadLINQ`] Like MR, writing raw Dryad is hard, programmers need to understand system resources and other lower-level details. This motivates a more declarative programming model: DryadLINQ as a querying language. -** FlumeJava (2010) ** -Many real-world computations involves a pipeline of MapReduces, and this motivates additional management to chain together those separate MapReduce stages in an efficient way. FlumeJava {% cite chambers2010flumejava --file big-data %} can help build those pipelines and keep computations modular. At core, FlumeJava are a couple of classes that represent immutable parallel collections. It defers evaluation and optimization by internally constructing an execution plan dataflow graph. +## Outline +- 1. Programming Models + - 1.1. Data parallelism: what is data parallelism and how do the following models relate to each other? + - 1.1.1 MapReduce + - 1.1.2 FlumeJava + - 1.1.3 Dryad + - 1.1.4 Spark -***Core Abstraction*** + - 1.2. Querying: we need more declarative interfaces, built on top MR models. + - Sawzall {%cite pike2005interpreting --file big-data %}: first one propose + - Pig {% cite olston2008pig --file big-data %}: on top of Hadoop, independent of execution platform, in theory can compiled into DryadLINQ too; what is the performance gain/lost? Easier to debug? + - Hive {%cite thusoo2009hive --file big-data %} + - DryadLINQ: SQL-like, uses Dryad as execution engine; + `Suggestion: Merge this with Dryad above?` + - Dremel, query natively w/o translating into MR jobs + - Spark SQL {%cite --file big-data %} - Limitations of Relational alone models? how SparkSQL model overcomes it? goals of SparkSQL? how it leverages the Spark programming model? what is a DataFrame and how is it different from a RDD? what are the operations a DataFrame provides? how is in-memory caching different from Spark? -- `PCollection<T>`, a immutable bag of elements of type `T` -- `recordOf(...)`, specifies the encoding of the instance -- `PTable<K, V>`, a subclass of `PCollection<Pair<K,V>>`, a immutable multi-map with keys of type `K` and values of type `V` -- `parallelDo()`, can be expressed both the map and reduce parts of MapReduce -- `groupByKey()`, same as shuffle step of MapReduce `JJ: clear this in MapReduce` -- `combineValues()`, semantically a special case of `parallelDo()`, a combination of a MapReduce combiner and a MapReduce reducer, which is more efficient than doing all the combining in the reducer. + - 1.3. Large-scale Parallelism on Graphs + - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly. + - GraphX programming model - discuss disadvantages graph-parallel model to data parallel model for large scale graph processing? how graphX combines the advantages of both the models? representation of a graph in GraphX? discuss the model, vertex cut partitioning and its importance? graph operations ? -***Deferred Evaluation*** -`(JJ: placehoder) join, deferred/materialized; execution plan; figure 1 initial execution plan` -***Optimizer*** -`(JJ: placehoder) parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` +- 2. Execution Models + - 2.1 MapReduce (intermediate writes to disk): What is the sequence of actions when a MapReduce functions are called? How is write-to-disk good/bad (fault-tolerant/slow)? How does the data are transmitted across clusters efficiently (store locally)? To shorten the total time for MR operations, it uses backup tasks. When MR jobs are pipelined, what optimizations can be performed by FlumeJava? In spite of optimizations and pipelining, what is the inherent limitation (not support iterative algorithm?) + - 2.2 Spark (all in memory): introduce spark architecture, different layers, what happens when a spark job is executed? what is the role of a driver/master/worker, how does a scheduler schedule the tasks and what performance measures are considered while scheduling? how does a scheduler manage node failures and missing partitions? how are the user defined transformations passed to the workers? how are the RDDs stored and memory management measures on workers? do we need checkpointing at all given RDDs leverage lineage for recovery? if so why ? + - 2.3 Graphs : + - Pregel :Overview of Pregel. Its implementation and working. its limitations. Do not stress more since we have a better model GraphX to explain a lot. + - GraphX : Working on this. + - SparkSQL Catalyst & Spark execution model : Discuss Parser, LogicalPlan, Optimizer, PhysicalPlan, Execution Plan. Why catalyst? how catalyst helps in SparkSQL , data flow from sql-core-> catalyst->spark-core +- 3. Evaluation: Given same algorithm, what is the performance differences between Hadoop, Spark, Dryad? There are no direct comparison for all those models, so we may want to compare separately: + - Hadoop vs. Spark + - Spark vs. SparkSQL from SparkSQL paper -//[`COMMENT: move this to introducing DryadLINQ`] Like MR, writing raw Dryad is hard, programmers need to understand system resources and other lower-level details. This motivates a more declarative programming model: DryadLINQ as a querying language. +- 4. Big Data Ecosystem + Everything interoperates with GFS or HDFS, or makes use of stuff like protocol buffers so systems like Pregel and MapReduce and even MillWheel... + - GFS/HDFS for MapReduce/Hadoop: Machines are unreliable, how do they provide fault-tolerance? How does GFS deal with single point of failure (shadow masters)? How does the master manage partition, transmission of data chunks? Which + - Resource Management: Mesos. New frameworks keep emerging and users have to use multiple different frameworks(MR, Spark etc.) in the same clusters, so how should they share access to the large datasets instead of costly replicate across clusters? + - Introducing streaming: what happens when data cannot be complete? How does different programming model adapt? windowing `todo: more` |
