diff options
| author | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-07 21:27:11 -0500 |
|---|---|---|
| committer | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-07 21:27:11 -0500 |
| commit | 298a2fd7ef76d22ace7ea63db021aae505541710 (patch) | |
| tree | f3f8449007d3fca69dac6c20fb9000314534d1e0 /chapter | |
| parent | 107ee3b1676a997a1fa86e846c46c530cbd7a817 (diff) | |
query
Diffstat (limited to 'chapter')
| -rw-r--r-- | chapter/8/big-data.md | 86 |
1 files changed, 64 insertions, 22 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index cf1c5b5..c4b4045 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -3,8 +3,6 @@ layout: page title: "Large Scale Parallel Data Processing" by: "Jingjing and Abhilash" --- -2015 NSDI Ousterhout -latency numbers that every programmer should know ## Outline - 1. Programming Models - 1.1. Data parallelism: what is data parallelism and how do the following models relate to each other? @@ -14,11 +12,8 @@ latency numbers that every programmer should know - 1.1.4 Spark - 1.2. Querying: we need more declarative interfaces, built on top MR models. - Sawzall {%cite pike2005interpreting --file big-data %}: first one propose - - Pig {% cite olston2008pig --file big-data %}: on top of Hadoop, independent of execution platform, in theory can compiled into DryadLINQ too; what is the performance gain/lost? Easier to debug? + - Pig {% cite olston2008pig --file big-data %} - Hive {%cite thusoo2009hive --file big-data %} - - DryadLINQ: SQL-like, uses Dryad as execution engine; - `Suggestion: Merge this with Dryad above?` - - Dremel, query natively w/o translating into MR jobs - Spark SQL {%cite --file big-data %} - Limitations of Relational alone models? how SparkSQL model overcomes it? goals of SparkSQL? how it leverages the Spark programming model? what is a DataFrame and how is it different from a RDD? what are the operations a DataFrame provides? how is in-memory caching different from Spark? - 1.3. Large-scale Parallelism on Graphs - Why a separate graph processing model? what is a BSP? working of BSP? Do not stress more since its not a map reduce world exactly. @@ -49,20 +44,20 @@ latency numbers that every programmer should know The MapReduce model is simple and powerful, and quickly became very popular among developers. However, when developers start writing real-world applications, they often end up chaining together MapReduce stages. The pipeline of MapReduce forces programmers to write additional coordinating codes, i.e. the development style goes backward from simple logic computation abstraction to lower-level coordination management. Besides, Developers mostly need to understand the execution model to do manual optimizations. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, so there's no need to write raw MR programs directly. -Microsfot **Dryad** {% cite isard2007dryad --file big-data %} designed differently from MapReduce and can support more general computations. It abstracts individual computation tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine to construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports to a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe and shared-memory FIFO. +Microsfot **Dryad** {% cite isard2007dryad --file big-data %} designed differently from MapReduce and can support more general computations. It abstracts individual computation tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine to construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports to a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe and shared-memory FIFO. -Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce, however, instead of writing data to disk for each job as MapReduce does, user program in Spark can explicitly cache an RDD in memory and reuse the same dataset across multiple parallel operations. This feature makes Spark suitable for iterative jobs and interactive analytics. +Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce, however, instead of writing data to disk for each job as MapReduce does, user program in Spark can explicitly cache an RDD in memory and reuse the same dataset across multiple parallel operations. This feature makes Spark suitable for iterative jobs and interactive analytics and also has better performance. -Details about the programming models of MapReduce, FlumeJava, Dryad and Spark are discussed in following four sections. +Following four sections discuss about the programming models of MapReduce, FlumeJava, Dryad and Spark. ### 1.1.1 MapReduce -In this model, parallelizable computations are abstracted into map and reduce functions. The computation accepts a set of key/value pairs as input and produces a set of key/value pairs as output. The process involves two phases: *Map* and *Reduce*: +In this model, parallelizable computations are abstracted into map and reduce functions. The computation accepts a set of key/value pairs as input and produces a set of key/value pairs as output. The process involves two phases: - *Map*, written by the user, accepts a set of key/value pairs("record") as input, applies *map* operation on each record, then it computes a set of intermediate key/value pairs as output. -- *Shuffle*, provided by MapReduce library, groups the all the intermediate values of the same key together and pass to *Reduce* function. -- *Reduce*, also written by the user, accepts an intermediate key and a set of values associated with that key, operate on them, produces zero or one output value. +- *Reduce*, also written by the user, accepts an intermediate key and a set of values associated with that key, operate on them, produces zero or one output value. + Note: there is a *Shuffle* phase between *map* and *reduce*, provided by MapReduce library, groups the all the intermediate values of the same key together and pass to *Reduce* function. We will discuss more in Section 2 Execution Models. Conceptually, the map and reduction functions have associated **types**: @@ -104,7 +99,7 @@ MapReduce runs on hundreds or thousands of unreliable commodity machines, so the - Writing raw MR program still requires plentiful efforts from programmers, especially when real applications require a pipeline of MapReduce jobs and programmers have to write coordinate code to chain together those MR stages. ### 1.1.2 FlumeJava -FlumeJava was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by using methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters. +FlumeJava was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by applying methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters. *Core Abstraction* - `PCollection<T>`, a immutable bag of elements of type `T` @@ -115,7 +110,7 @@ FlumeJava was introduced to make it easy to develop, test, and run efficient dat - `combineValues()`, semantically a special case of `parallelDo()`, a combination of a MapReduce combiner and a MapReduce reducer, which is more efficient than doing all the combining in the reducer. *Deferred Evaluation* -The state of each `PCollection` object is either *deferred* (not yet computed) and *materialized* (computed). +The state of each `PCollection` object is either *deferred* (not yet computed) and *materialized* (computed). When the program invokes a parallel operation, it does not actually run the operation. *Example* `TODO: example and explain the execution plan` @@ -131,7 +126,7 @@ PCollection<String> words = ``` *Optimizer* -`parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` +`TODO: parallelDo Fusion; MSCR; overall goal to produce the fewest, most efficient MSCR operations in the final optimized plan` ### 1.1.3 Dryad @@ -191,11 +186,18 @@ Other benefits include the scheduling of tasks based on data locality to improve ### 1.2 Querying: declarative interfaces - -Map reduce provides only two high level primitives - map and reduce; that the programmers have to worry about. Map reduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework still suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. +MapReduce provides only two high level primitives - map and reduce that the programmers have to worry about. MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. +Several important patterns like joins (which could be highly complex depending on the data) are extremely hard to implement and reason about for a programmer. Sometimes the code could be become repetitive when the programmer wants to implement most common operations like projection, filtering etc. Non-programmers like data scientists would highly prefer SQL like interface over a cumbersome and rigid framework. Such a high level declarative language can easily express their task while leaving all of the execution optimization details to the backend engine. Also, these kind of abstractions provide ample opportunities for query optimizations. -**Introduce Sazwal** (its now no more used but one of the first ideas) : Parallel analysis with Sawzall. Scientific Programming, 13(4):277–298, 2005 +Sawzall {% cite pike2005interpreting --file big-data%} is a programming language built on top of MapReduce. It consists of a *filter* phase (map) and an *aggregation* phase (reduce). User program can specify the filter function, and emits the intermediate pairs to external pre-built aggregators. + +Hive {% cite thusoo2009hive --file big-data %} is built by Facebook to organize dataset in structured formats and still utilize the benefit of MapReduce framework. It has its own SQL-like language: HiveQL which is easy for anyone who understands SQL. It has a component called *metastore* that are created and reused each time the table is referenced by HiveQL like the way traditional warehousing solutions do. + +Pig Latin {% cite olston2008pig --file big-data%} aims at a sweet spot between declarative and procedural programming. For advanced programmers, SQL is unnatural to implement program logic and Pig Latin wants to dissemble the set of data transformation into a sequence of steps. + +The following subsections will discuss Hive, Pig Latin, SparkSQL in details. + ### 1.2.x Hive/HiveQL @@ -206,17 +208,18 @@ Data in Hive is organized into three different formats : `Tables`: Like RDBMS tables Hive contains rows and tables and every table can be mapped to HDFS directory. All the data in the table is serialized and stored in files under the corresponding directory. Hive is extensible to accept user defined data formats, custom serialize and de-serialize methods. It also supports external tables stored in other native file systems like HDFS, NFS or local directories. `Paritions`: Distribution of data in sub directories of table directory is is determined by one or more partitions. A table can be further partitioned on columns. -`Buckets`: Data in each partition can be further divided into buckets on the basis on hash of a column in a table. Each bucket is stored as a file in the partition directory. -***HiveSQL*** : +`Buckets`: Data in each partition can be further divided into buckets on the basis on hash of a column in a table. Each bucket is stored as a file in the partition directory. -Hive query language (HiveQL) consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this: +***HiveSQL***: Hive query language consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this: +``` FROM ( MAP inputdata USING 'python mapper.py' AS (word, count) FROM inputtable CLUSTER BY word ) REDUCE word, count USING 'python reduce.py'; +``` This query uses mapper.py for transforming inputdata into (word, count) pair, distributes data to reducers by hashing on word column (given by CLUSTER) and uses reduce.py. INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handle reader and writer concurrency. @@ -224,7 +227,36 @@ INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handl ***Serialization/Deserialization*** Hive implements the LazySerDe as the default SerDe. It deserializes rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row. Hive also supports various formats like TextInputFormat, SequenceFileInputFormat and RCFileInputFormat. -### 1.3.x SparkSQL - Where Relational meets Procedural : +### 1.2.x Pig Latin +The goal of Pig Latin is to attract experienced programmers to perform ad-hoc analysis on big data. Parallel database products provide a simple SQL query interface, which is good for non-programmers and simple tasks, but not in a style where experienced programmers would approach. Instead such programmers prefer to specify single steps and operate as a sequence. + +For example, suppose we have a table urls: `(url, category, pagerank)`. The following is a simple SQL query that finds, for each suciently large category, the average pagerank of high-pagerank urls in that category. + +``` +SELECT category, AVG(pagerank) +FROM urls WHERE pagerank > 0.2 +GROUP BY category HAVING COUNT(*) > 106 +``` + +And Pig Latin would address in following way: + +``` +good_urls = FILTER urls BY pagerank > 0.2; +groups = GROUP good_urls BY category; +big_groups = FILTER groups BY COUNT(good_urls)>106; +output = FOREACH big_groups GENERATE + category, AVG(good_urls.pagerank); +``` + +*Interoperability* Pig Latin is designed to support ad-hoc data analysis, which means the input only requires a function to parse the content of files into tuples. This saves the time-consuming import step. While as for the output, Pig provides freedom to convert tuples into byte sequence where the format can be defined by users. + +*Nested Data Model* Pig Latin has a flexible, fully nested data model, and allows complex, non-atomic data types such as set, map, and tuple to occur as fields of a table. The benefits include: closer to how programmer think; data can be stored in the same nested fashion to save recombining time; can have algebraic language; allow rich user defined functions. + +*UDFs as First-Class Citizens* Pig Latin supports user-defined functions (UDFs) to support customized tasks for grouping, filtering, or per-tuple processing. + +*Debugging Environment* Pig Latin has a novel interactive debugging environment that can generate a concise example data table to illustrate output of each step. + +### 1.2.x SparkSQL - Where Relational meets Procedural : Relational interface to big data is good, however, it doesn’t cater to users who want to perform - ETL to and from various semi or unstructured data sources. @@ -422,6 +454,12 @@ Hence, in Spark SQL, transformation of user queries happens in four phases : ***Code Generation :*** The final phase generates the Java byte code that should run on each machine.Catalyst transforms the Tree which is an expression in SQL to an AST for Scala code to evaluate, compile and run the generated code. A special scala feature namely quasiquotes aid in the construction of abstract syntax tree(AST). + +## 3. Big Data Ecosystem + + + + ## References {% bibliography --file big-data %} @@ -508,3 +546,7 @@ Apache Giraph is an open source implementation of Pregel in which new features l - GFS/HDFS for MapReduce/Hadoop: Machines are unreliable, how do they provide fault-tolerance? How does GFS deal with single point of failure (shadow masters)? How does the master manage partition, transmission of data chunks? Which - Resource Management: Mesos. New frameworks keep emerging and users have to use multiple different frameworks(MR, Spark etc.) in the same clusters, so how should they share access to the large datasets instead of costly replicate across clusters? - Introducing streaming: what happens when data cannot be complete? How does different programming model adapt? windowing `todo: more` + + 2015 NSDI Ousterhout + + latency numbers that every programmer should know |
