From 26f84b0695d691e84ca120cce74ed96ac1886bdb Mon Sep 17 00:00:00 2001 From: msabhi Date: Fri, 2 Dec 2016 05:16:04 -0500 Subject: Update big-data.md --- chapter/8/big-data.md | 97 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 27 deletions(-) (limited to 'chapter/8/big-data.md') diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index d49d5a1..ec5edf6 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -82,6 +82,28 @@ In the paper, the authors measure the performance of MapReduce on two computatio Overall, the performance is very good for conceptually unrelated computations. +## Iterative processing in Map Reduce: + +Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop, Twister and iMapReduce adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations. + +Haloop : HaLoop: Efficient Iterative Data Processing on Large Clusters. + +iMapReduce: iMapReduce: A Distributed Computing Framework for Iterative Computation + +Twister : Twister: a runtime for iterative MapReduce. + +## Map Reduce inspired other large scale data processing systems : + +Dryad/DryadLinq : + +Spark (big one) : content is ready, need to format a bit and paste + +## Declarative interfaces for the Map Reduce framework: +Map reduce provides only two high level primitives - map and reduce; that the programmers have to worry about. Map reduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework still suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. +Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Also, these kind of abstractions provide ample opportunities for query optimizations. + +Introduce Sazwal (its now no more used but one of the first ideas) : Parallel analysis with Sawzall. Scientific Programming, 13(4):277–298, 2005 + ## FlumeJava (2010) Many real-world computations involves a pipeline of MapReduces, and this motivates additional management to chain together those separate MapReduce stages in an efficient way. FlumeJava {% cite chambers2010flumejava --file big-data %} can help build those pipelines and keep computations modular. At core, FlumeJava are a couple of classes that represent immutable parallel collections. It defers evaluation and optimization by internally constructing an execution plan dataflow graph. @@ -100,33 +122,54 @@ Many real-world computations involves a pipeline of MapReduces, and this motivat **Optimizer** `(JJ: placehoder) parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` -# Graph paralleling - -Though highly efficient and one of the first major programming models for distributed batch processing, it too has a few limitations.
-Map Reduce doesn’t scale easily and is highly inefficient for iterative / graph algorithms like page rank and machine learning algorithms. Iterative algorithms requires programmer to explicitly handle the intermediate results (writing to disks). Hence, every iteration requires reading the input file and writing the results to the disk resulting in high disk I/O which is a performance bottleneck for any batch processing system.
-Also graph algorithms require exchange of messages between vertices. In case of PageRank, every vertex requires the contributions from all its adjacent nodes to calculate its score. Map reduce currently lacks this model of message passing which makes it complex to reason about graph algorithms.
-## Bulk synchronous parallel model -This model was introduced in 1980 to represent the hardware design features of parallel computers. It gained popularity as an alternative for map reduce since it addressed the above mentioned issues with map reduce to an extent.
-In BSP model -+ Computation consists of several steps called as supersets. -+ The processors involved have their own local memory and every processor is connected to other via a point-to-point communication. -+ At every superstep, a processor receives input at the beginning, performs computation and outputs at the end. -+ Barrier synchronization synchs all the processors at the end of every superstep.
- -A notable feature of the model is the complete control on data through communication between every processor at every superstep. BSP preserves data in memory across supersteps and helps in reasoning iterative graph algorithms.
-`Pregel` is an implementation of classic BSP model by Google (PageRank) to analyze large graphs exclusively. It was followed by open source implementations - Apache’s Giraph and Hama; which were BSP models built on top of Hadoop. -Pregel is highly scalable, fault-tolerant and can successfully represent larger complex graphs. Google claims the API becomes easy once a developer adopts “think like a vertex” mode. -Pregel’s computation system is iterative and every iteration is called as superstep. The system takes a directed graph as input with properties assigned to both vertices and graph. At each superstep, all vertices executes in parallel, a user-defined function which represents the behavior of the vertex. The function has access to message sent to its vertex from the previous superstep S-1 and can update the state of the vertex, its edges, the graph and even send messages to other vertices which would receive in the next superstep S+1. The synchronization happens only between two supersteps. Every vertex is either active or inactive at any superstep. The iteration stops when all the vertices are inactive. A vertex can deactivate itself by voting for it and gets active if it receives a message. This asynchronous message passing feature eliminates the shared memory, remote reads and latency of Map reduce model.
-Pregel’s API provides
-+ compute() method for the user to implement the logic to change the state of the graph/vertex at every superstep. It guarantees message delivery through an iterator at every superstep. -+ User defined handler for handling issues like missing destination vertex etc. -+ Combiners reduce the amount of messages passed from multiple vertices to the same destination vertex. -+ Aggregators capture the global state of the graph. A reduce operation combines the value given by every vertex to the aggregator. The combined/aggregated value is passed onto to all the vertices in the next superstep. -+ Fault tolerance is achieved through checkpointing and instructing the workers to save the state of nodes to a persistent storage. When a machine fails, all workers restart the execution with state of their recent checkpoint. -+ Master and worker implementation : The master partitions graph into set of vertices (hash on vertex ID mod number of partitions) and outgoing edges per partition. Each partition is assigned to a worker who manages the state of all its vertices by executing compute() method and coordinating the message communication. The workers also notifies the master of the vertices that are active for the next superstep.
- -Pregel works good for sparse graphs. However, dense graph could cause communication overhead resulting in system to break. Also, the entire computation state resides in the main memory. -Apache Giraph is an open source implementation of Pregel in which new features like master computation, sharded aggregators, edge-oriented input, out-of-core computation are added making it more efficient. The most high performance graph processing framework is GraphLab which is developed at Carnegie Melon University and uses the BSP model and executes on MPI. + +Pig Latin : Pig latin: a not-so-foreign language for data processing. In SIGMOD, pages 1099–1110, 2008. + +Hive : + +Dremel : + + +## Where Relational meets Procedural : +Relational interface to big data is good, however, it doesn’t cater to users who want to perform +1> ETL to and from various semi or unstructured data sources. +2> advanced analytics like machine learning or graph processing. +These user actions require best of both the worlds - relational queries and procedural algorithms. Spark SQL bridges this gap by letting users to seamlessly intermix both relational and procedural API. +Hence, the major contributions of Spark SQL are the Dataframe API and the Catalyst. +Spark SQL intends to provide relational processing over native RDDs and on several external data sources, through a programmer friendly API, high performance through DBMS techniques, support semi-structured data and external databases, support for advanced analytical processing like machine learning algorithms and graph processing. +Programming API : +Spark SQL runs on the top of Spark providing SQL interfaces. A user can interact with this interface though JDBC/ODBC, command line or Dataframe API. +A Dataframe API lets users to intermix both relational and procedural code with ease. Dataframe is a collection of schema based rows of data and named columns on which relational operations can be performed with optimized execution. Unlike a RDD, Dataframe allows developers to define structure for the data and can be related to tables in a relational database or R/Python’s Dataframe. Dataframe can be constructed from tables of external sources or existing native RDD’s. Dataframe is lazy and each object in it represents a logical plan which is not executed until an output operation like save or count is performed. +Spark SQL supports all the major SQL data types including complex data types like arrays, maps and unions. +Some of the Dataframe operations include projection (select), filter(where), join and aggregations(groupBy). +Illustrated below is an example of relational operations on employees data frame to compute the number of female employees in each department. +employees +.join(dept, employees("deptId") === dept("id")) .where(employees("gender") === "female") .groupBy(dept("id"), dept("name")) .agg(count("name")) +Several of these operators like === for equality test, > for greater than, a rithmetic ones (+, -, etc) and aggregators transforms to a abstract syntax tree of the expression which can be passed to Catalyst for optimization. +A cache() operation on the data frame helps Spark SQL store the data in memory so it can be used in iterative algorithms and for interactive queries. In case of Spark SQL, memory footprint is considerably less as it applies columnar compression schemes like dictionary encoding / run-length encoding. +MORE EXPLANATION NEEDED... + + + +## Optimizers are the way to go : +It is tough to understand the internals of a framework like Spark for any developer who has just started to program a Spark application. Also, with the advent of relational code, it becomes still more challenging when one has to program keeping in mind the rules for an efficient query - rightly ordered joins, early filtering of data or usage of available indexes. Even if the programmer is aware of such rules, it is still prone to human errors which can potentially lead to longer runtime applications. Query optimizers for map reduce frameworks can greatly improve performance of the queries developers write and also significantly reduce the development time. A good query optimizer should be able to optimize such user queries, extensible for user to provide information about the data and even dynamically include developer defined specific rules. +Catalyst is one such framework which leverages the Scala’s functional language features like pattern matching and runtime meta programming to allow developers to concisely specify complex relational optimizations. Most of the power of Spark SQL comes due to this optimizer. + +Catalyst includes both rule-based and cost-based optimization. It is extensible to include new optimization techniques and features to Spark SQL and also let developers provide data source specific rules. +Catalyst executes the rules on its data type Tree - a composition of node objects where each node has a node type (subclasses of TreeNode class in Scala) and zero or more children. Node objects are immutable and can be manipulated. The transform method of a Tree applies pattern matching to match a subset of all possible input trees on which the optimization rules needs to be applied. +In Spark SQL, transformation happens in four phases : +Analyzing a logical plan to resolve references : In the analysis phase a relation either from the abstract syntax tree (AST) returned by the SQL parser or from a DataFrame is analyzed to create a logical plan out of it, which is still unresolved (the columns referred may not exist or may be of wrong datatype). The logical plan is resolved using using the Catalyst’s Catalog object(tracks the table from all data sources) by mapping the named attributes to the input provided, looking up the relations by name from catalog, by propagating and coercing types through expressions. + +Logical plan optimization : In this phase, several of the rules like constant folding, predicate push down, projection pruning, null propagation, boolean expression simplification are applied on the logical plan. + +Physical planning : In this phase, Spark generates multiples physical plans out of the input logical plan and chooses the plan based on a cost model. The physical planner also performs rule-based physical optimizations, such as pipelining projections or filters into one Spark map operation. In addition, it can push operations from the logical plan into data sources that support predicate or projection pushdown. + + +Code Generation : The final phase generates the Java byte code that should run on each machine.Catalyst transforms the Tree which is an expression in SQL to an AST for Scala code to evaluate, compile and run the generated code. A special scala feature namely quasiquotes aid in the construction of abstract syntax tree(AST). + + + + ## References -- cgit v1.2.3