diff options
| author | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-16 13:49:23 -0500 |
|---|---|---|
| committer | Jingjing Ren <renjj@ccs.neu.edu> | 2016-12-16 13:49:23 -0500 |
| commit | 9d61da27a7187eab000907d20a7bddcfa4b8275c (patch) | |
| tree | e0994731bf4f116f54ecddc249c21c03ae9f2bb4 | |
| parent | 9970cbba68263523bad2d28e9d009ef6fa16537f (diff) | |
update
| -rw-r--r-- | chapter/8/big-data.md | 64 |
1 files changed, 45 insertions, 19 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index e2d1086..018894d 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -35,7 +35,7 @@ Outline The MapReduce model is simple and powerful and quickly becomes very popular among developers. However, when developers start writing real-world applications, they often end up writing many boilerplates and chaining together these stages. Moreover, The pipeline of MapReduce forces them to write additional coordinating codes, i.e., the development style goes backward from simple logic computation abstraction to lower-level coordination management. As we will discuss in *section 2 execution model*, MapReduce writes all data into disk after each stage, which causes severe delays. Programmers need to do manual optimizations for targeted performance, and this again requires them to understand the underlying execution model. The whole process soon becomes cumbersome. **FlumeJava** {%cite chambers2010flumejava --file big-data%} library intends to provide support for developing data-parallel pipelines by abstracting away the complexity involved in data representation and implicitly handling the optimizations. It defers the evaluation, constructs an execution plan from parallel collections, optimizes the plan, and then executes underlying MR primitives. The optimized execution is comparable with hand-optimized pipelines, thus there is no much need to write raw MR programs directly. -After MapReduce, Microsoft proposed their data parallelism model: **Dryad** {% cite isard2007dryad --file big-data %}, which abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe, and shared-memory FIFO. The programming model is less elegant than MapReduce, programmers are not meant to interact with them directly. Instead, they are expected to use the high-level programming interfaces DryadLinq {% cite yu2008dryadlinq --file big-data %}, which more expressive and well embedded with .NET framework. +After MapReduce, Microsoft proposed their counterpart data parallelism model: **Dryad** {% cite isard2007dryad --file big-data %}, which abstracts individual computational tasks as vertices, and constructs a communication graph between those vertices. What programmers need to do is to describe this DAG graph and let Dryad execution engine construct the execution plan and manage scheduling and optimization. One of the advantages of Dryad over MapReduce is that Dryad vertices can process an arbitrary number of inputs and outputs, while MR only supports a single input and a single output for each vertex. Besides the flexibility of computations, Dryad also supports different types of communication channel: file, TCP pipe, and shared-memory FIFO. The programming model is less elegant than MapReduce, programmers are not meant to interact with them directly. Instead, they are expected to use the high-level programming interfaces DryadLinq {% cite yu2008dryadlinq --file big-data %}, which more expressive and well embedded with .NET framework. We can see some examples in the end of *section 1.1.3 Dryad*. Dryad expresses computation as acyclic data flows, which might be too expensive for some complex applications, e.g. iterative machine learning algorithms. **Spark** {% cite zaharia2010spark --file big-data%} is a framework that uses functional programming and pipelining to provide such support. It is largely inspired by MapReduce's model and builds upon the ideas behind DAG, lazy evaluation of DryadLinq. Instead of writing data to disk for each job as MapReduce does Spark can cache the results across jobs. Spark explicitly caches computational data in memory through specialized immutable data structure named Resilient Distributed Sets(RDD) and reuse the same dataset across multiple parallel operations. The Spark builds upon RDD to achieve fault tolerance by reusing the lineage information of the lost RDD. This results in lesser overhead than what is seen in fault tolerance achieved by the checkpoint in Distributed Shared Memory systems. Moreover, Spark is the underlying framework upon which many very different systems are built, e.g., Spark SQL & DataFrames, GraphX, Streaming Spark, which makes it easy to mix and match the use of these systems all in the same application.These feature makes Spark the best fit for iterative jobs and interactive analytics and also helps it in providing better performance. @@ -171,6 +171,32 @@ GraphBuilder HOutputs = HSet >= output; GraphBuilder final = XInputs || YInputs || XToY || YToH || HOutputs; ``` +In fact, developers are not expected to write raw Dryad programs as complex as above. Instead, Microsoft introduced a querying model DryadLINQ {% cite yu2008dryadlinq --file big-data %} which is more declarative. We will discuss querying models and their power to express complex operations like join in *section 1.2 Querying*. Here we just show a glimpse of querying example in DryadLINQ (who is compiled into Dryad jobs and executed in Dryad execution engine): + +```c# +//SQL-style syntax to join two input sets: +// scoreTriples and staticRank +var adjustedScoreTriples = + from d in scoreTriples + join r in staticRank on d.docID equals r.key + select new QueryScoreDocIDTriple(d, r); +var rankedQueries = + from s in adjustedScoreTriples + group s by s.query into g + select TakeTopQueryResults(g); + +// Object-oriented syntax for the above join +var adjustedScoreTriples = + scoreTriples.Join(staticRank, + d => d.docID, r => r.key, + (d, r) => new QueryScoreDocIDTriple(d, r)); +var groupedQueries = + adjustedScoreTriples.GroupBy(s => s.query); +var rankedQueries = + groupedQueries.Select( + g => TakeTopQueryResults(g)); +``` + *Fault tolerance policy* The communication graph is acyclic, so if given immutable inputs, the computation result should remain same regardless of the sequence of failures. When a vertex fails, the job manager will either get notified or receive a heartbeat timeout and then the job manager will immediately schedule to re-execute the vertex. @@ -226,12 +252,12 @@ RDDs are immutable and can only be created through coarse-grained transformation ***Challenges in Spark*** -- `Functional API semantics`: The *GroupByKey* operator is costly in terms of performance. In that it returns a distributed collection of (key, list of value) pairs to a single machine and then an aggregation on individual keys is performed on the same machine resulting in computation overhead. Spark does provide *reduceByKey* operator which does a partial aggregation on individual worker nodes before returning the distributed collection. However, developers who are not aware of such a functionality can unintentionally choose groupByKey. The reason being functional programmers (Scala developers) tend to think more declaratively about the problem and only see the end result of the groupByKey operator. They may not be necessarily trained on how groupByKey is implemented atop of the cluster. Therefore, to use Spark, unlike functional programming languages, one needs to understand how the underlying cluster is going to execute the code. The burden of saving performance is then left to the programmer, who is expected to understand the underlying execution model of Spark, and to know when to use reduceByKey over groupByKey. +- *Functional API semantics* The `GroupByKey` operator is costly in terms of performance. In that it returns a distributed collection of (key, list of value) pairs to a single machine and then an aggregation on individual keys is performed on the same machine resulting in computation overhead. Spark does provide `reduceByKey` operator which does a partial aggregation on individual worker nodes before returning the distributed collection. However, developers who are not aware of such a functionality can unintentionally choose `groupByKey`. The reason being functional programmers (Scala developers) tend to think more declaratively about the problem and only see the end result of the `groupByKey` operator. They may not be necessarily trained on how `groupByKey` is implemented atop of the cluster. Therefore, to use Spark, unlike functional programming languages, one needs to understand how the underlying cluster is going to execute the code. The burden of saving performance is then left to the programmer, who is expected to understand the underlying execution model of Spark, and to know when to use `reduceByKey` over `groupByKey`. -- `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient. +- *Debugging and profiling* There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient. ### 1.2 Querying: declarative interfaces -MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, MapReduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in map-reduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown below, can span up to 63 lines. +MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, MapReduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in MapReduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown below, can span up to 63 lines. *Complete code for Word count in Hadoop (Java based implementation of MapReduce)* @@ -362,13 +388,13 @@ The following subsections will discuss Hive, Pig Latin, SparkSQL in details. ### 1.2.1 Hive/HiveQL -Hive {% cite thusoo2010hive --file big-data%} is a data-warehousing infrastructure built on top of the MapReduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into MapReduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. +Hive {% cite thusoo2010hive --file big-data%} is a data-warehousing infrastructure built on top of the MapReduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query, and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into MapReduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. Data in Hive is organized into three different formats: -`Tables`: Like RDBMS tables Hive contains rows and tables and every table can be mapped to HDFS directory. All the data in the table is serialized and stored in files under the corresponding directory. Hive is extensible to accept user defined data formats, custom serialize and de-serialize methods. It also supports external tables stored in other native file systems like HDFS, NFS or local directories. +`Tables`: Like RDBMS tables Hive contains rows and tables and every table can be mapped to HDFS directory. All the data in the table is serialized and stored in files under the corresponding directory. Hive is extensible to accept user-defined data formats, customized serialize and de-serialize methods. It also supports external tables stored in other native file systems like HDFS, NFS or local directories. -`Paritions`: Distribution of data in sub directories of table directory is is determined by one or more partitions. A table can be further partitioned on columns. +`Paritions`: Distribution of data in sub directories of table directory is determined by one or more partitions. A table can be further partitioned on columns. `Buckets`: Data in each partition can be further divided into buckets on the basis on hash of a column in a table. Each bucket is stored as a file in the partition directory. @@ -392,7 +418,7 @@ Hive implements the LazySerDe as the default SerDe interface. A SerDe is a combi ### 1.2.2 Pig Latin -Pig Latin {% cite olston2008pig --file big-data%} is a programming model built on top of MapReduce to provide declarative description. Different from Hive, who has SQL-like syntax, the goal of Pig Latin is to attract experienced programmers to perform ad-hoc analysis on big data and allow programmers to write execution logic by a sequence of steps. For example, suppose we have a table urls: `(url, category, pagerank)`. The following is a simple SQL query that finds, for each suciently large category, the average pagerank of high-pagerank urls in that category. +Pig Latin {% cite olston2008pig --file big-data%} is a programming model built on top of MapReduce to provide a declarative description. Different from Hive, who has SQL-like syntax, the goal of Pig Latin is to attract experienced programmers to perform ad-hoc analysis on big data and allow programmers to write execution logic by a sequence of steps. For example, suppose we have a table URLs: `(url, category, pagerank)`. The following is a simple SQL query that finds, for each sufficiently large category, the average pagerank of high-pagerank URLs in that category. ``` SELECT category, AVG(pagerank) @@ -400,7 +426,7 @@ FROM urls WHERE pagerank > 0.2 GROUP BY category HAVING COUNT(*) > 106 ``` -And Pig Latin provides an alternative to carry out the same operations in the way programmers can reason more easily: +And Pig Latin provides an alternative to carrying out the same operations in the way programmers can reason more easily: ``` good_urls = FILTER urls BY pagerank > 0.2; @@ -417,7 +443,7 @@ output = FOREACH big_groups GENERATE *UDFs as First-Class Citizens* Pig Latin supports user-defined functions (UDFs) to support customized tasks for grouping, filtering, or per-tuple processing, which makes Pig Latin more declarative. -*Debugging Environment* Pig Latin has a novel interactive debugging environment that can generate a concise example data table to illustrate output of each step. +*Debugging Environment* Pig Latin has a novel interactive debugging environment that can generate a concise example data table to illustrate the output of each step. *Limitations* The procedural design gives users more control over execution, but at same time the data schema is not enforced explicitly, so it much harder to utilize database-style optimization. Pig Latin has no control structures like loop or conditions, if needed, one has to embed it in Java like JDBC style, but this can easily fail without static syntax checking. It is also not easy to debug. @@ -430,28 +456,28 @@ The major contributions of Spark SQL {% cite armbrust2015spark --file big-data%} ***Programming API*** -Spark SQL runs on the top of Spark providing SQL interfaces. A user can interact with this interface though JDBC/ODBC, command line or Dataframe API. -A Dataframe API lets users to intermix both relational and procedural code with ease. Dataframe is a collection of schema based rows of data and named columns on which relational operations can be performed with optimized execution. Unlike a RDD, Dataframe allows developers to define structure for the data and can be related to tables in a relational database or R/Python’s Dataframe. Dataframe can be constructed from tables of external sources or existing native RDD’s. Dataframe is lazy and each object in it represents a logical plan which is not executed until an output operation like save or count is performed. -Spark SQL supports all the major SQL data types including complex data types like arrays, maps and unions. +Spark SQL runs on the top of Spark providing SQL interfaces. A user can interact with this interface through JDBC/ODBC, command line or Dataframe API. +A Dataframe API lets users to intermix both relational and procedural code with ease. Dataframe is a collection of schema based rows of data and named columns on which relational operations can be performed with optimized execution. Unlike an RDD, Dataframe allows developers to define the structure for the data and can be related to tables in a relational database or R/Python’s Dataframe. Dataframe can be constructed from tables of external sources or existing native RDD’s. Dataframe is lazy and each object in it represents a logical plan which is not executed until an output operation like save or count is performed. +Spark SQL supports all the major SQL data types including complex data types like arrays, maps, and unions. Some of the Dataframe operations include projection (select), filter(where), join and aggregations(groupBy). Illustrated below is an example of relational operations on employees data frame to compute the number of female employees in each department. -``` +```scala employees.join(dept, employees("deptId") === dept("id")) .where(employees("gender") === "female") .groupBy(dept("id"), dept("name")) .agg(count("name")) ``` -Several of these operators like === for equality test, > for greater than, a rithmetic ones (+, -, etc) and aggregators transforms to a abstract syntax tree of the expression which can be passed to Catalyst for optimization. -A cache() operation on the data frame helps Spark SQL store the data in memory so it can be used in iterative algorithms and for interactive queries. In case of Spark SQL, memory footprint is considerably less as it applies columnar compression schemes like dictionary encoding / run-length encoding. +Several of these operators like $$===$$ for equality test, $$>$$ for greater than, arithmetic ones ($$+$$, $$-$$, etc) and aggregators transforms to an abstract syntax tree of the expression which can be passed to Catalyst for optimization. +A `cache()` operation on the data frame helps Spark SQL store the data in memory so it can be used in iterative algorithms and for interactive queries. In the case of Spark SQL, memory footprint is considerably less as it applies columnar compression schemes like dictionary encoding / run-length encoding. The DataFrame API also supports inline UDF definitions without complicated packaging and registration. Because UDFs and queries are both expressed in the same general purpose language (Python or Scala), users can use standard debugging tools. -However, a DataFrame lacks type safety. In the above example, attributes are referred to by string names. Hence, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only detected at runtime, when the query plan is created. +However, a DataFrame lacks type safety. In the above example, attributes are referred to by string names. Hence, it is not possible for the compiler to catch any errors. If attribute names are incorrect then the error will only be detected at runtime, when the query plan is created. -Also, Dataframe is both very brittle and very verbose as well, because the user has to cast each row and column to specific types before they can do anything on them. Naturally this is very error-prone because one could accidentally choose the wrong index for a row/column and end up with a ```ClassCastException```. +Also, Dataframe is both very brittle and very verbose as well, because the user has to cast each row and column to specific types before they can do anything on them. Naturally, this is very error-prone because one could accidentally choose the wrong index for a row/column and end up with a ```ClassCastException```. -Spark introduced a extension to Dataframe called ***Dataset*** to provide this compile type safety. It embraces object oriented style for programming and has an additional feature termed Encoders. Encoders translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object +Spark introduced an extension to Dataframe called ***Dataset*** to provide this compile type safety. It embraces object-oriented style for programming and has an additional feature termed Encoders. Encoders translate between JVM representations (objects) and Spark’s internal binary format. Spark has built-in encoders which are very advanced in that they generate bytecode to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object Winding up - we can compare SQL vs Dataframe vs Dataset as below : |
