diff options
| -rw-r--r-- | chapter/8/big-data.md | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index 46e58bc..f4288ee 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -346,7 +346,7 @@ The following subsections will discuss Hive, Pig Latin, SparkSQL in details. ### 1.2.1 Hive/HiveQL -Hive is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. +Hive {% cite thusoo2010hive --file big-data%} is a data-warehousing infrastructure built on top of the map reduce framework - Hadoop. The primary responsibility of Hive is to provide data summarization, query and analysis. It supports analysis of large datasets stored in Hadoop’s HDFS {% cite shvachko2010hadoop --file big-data%}. It supports SQL-Like access to structured data which is known as HiveQL (or HQL) as well as big data analysis with the help of MapReduce. These SQL queries can be compiled into map reduce jobs that can be executed be executed on Hadoop. It drastically brings down the development time in writing and maintaining Hadoop jobs. Data in Hive is organized into three different formats: @@ -517,15 +517,15 @@ The GraphX RDG structure implements a vertex-cut representation of a graph using ## 2 Execution Models -There are many possible implementations for those programming models. In this section, we will discuss about a few different execution models, how the above programming interfaces exploit them, the benefits and limitations of each design and so on. At a very high level, MapReduce, its variants and Spark all adopt the master/workers model, where the master(or driver in Spark) is responsible for managing data and dynamically scheduling tasks to workers. The master monitors workers' status, and when failure happens, master will reschedule the task to another idle worker. However, data in MapReduce(section 2.1) is distributed over clusters and needs to be moved in and out of the disk, and Spark(section 2.2) takes the in-memory processing approach. This practice saves significant I/O operations and thus is much faster than MapReduce. As for fault tolerance, MapReduce uses data persistence and Spark achieves it by using lineage(recomputation for failed task). +There are many possible implementations for those programming models. In this section, we will discuss a few different execution models, how the above programming interfaces exploit them, the benefits and limitations of each design and so on. At a very high level, MapReduce, its variants, and Spark all adopt the master/workers model, where the master(or driver in Spark) is responsible for managing data and dynamically scheduling tasks to workers. The master monitors workers' status, and when failure happens, the master will reschedule the task to another idle worker. However, data in MapReduce(section 2.1) is distributed over clusters and needs to be moved in and out of the disk, and Spark(section 2.2) takes the in-memory processing approach. This practice saves significant I/O operations and thus is much faster than MapReduce. As for fault tolerance, MapReduce uses data persistence and Spark achieves it by using lineage(recomputation for failed task). -As for more declarative querying models, the execution engine needs to take care of query compilation and in the meantime has opportunity of optimizations. For example, Hive(section 2.3) not only needs a driver as the way MapReduce and Spark do, but also has to manage the metastore as well as to take advantage of optimization gain from traditional database like design. SparkSQL(section 2.4) adopts Catalyst framework for SQL optimization: rule-based and cost-based. +As for more declarative querying models, the execution engine needs to take care of query compilation and in the meantime has the opportunity of optimizations. For example, Hive(section 2.3) not only needs a driver as the way MapReduce and Spark do but also has to manage the meta store as well as to take advantage of optimization gain from traditional database like design. SparkSQL(section 2.4) adopts Catalyst framework for SQL optimization: rule-based and cost-based. ### 2.1 MapReduce execution model The original MapReduce model is implemented and deployed in Google infrastructure. As described in section 1.1.1, user program defines map and reduce functions and the underlying system manages data partition and schedules jobs across different nodes. Figure 2.1.1 shows the overall flow when the user program calls MapReduce function: 1. Split data. The input files are split into *M* pieces; -2. Copy processes. The user program create a master process and the workers. The master picks idle workers to do either map or reduce task; +2. Copy processes. The user program creates a master process and the workers. The master picks idle workers to do either map or reduce task; 3. Map. The map worker reads corresponding splits and passes to the map function. The generated intermediate key/value pairs are buffered in memory; 4. Partition. The buffered pairs are written to local disk and partitioned to *R* regions periodically. Then the locations are passed back to the master; 5. Shuffle. The reduce worker reads from the local disks and groups together all occurrences of the same key together; @@ -537,7 +537,7 @@ The original MapReduce model is implemented and deployed in Google infrastructur </figure> <p>Figure 2.1.1 Execution overview<label for="sn-proprietary-monotype-bembo" class="margin-toggle sidenote-number"></label><input type="checkbox" id="sn-proprietary-monotype-bembo" class="margin-toggle"/><span class="sidenote">from original MapReduce paper {%cite dean2008mapreduce --file big-data%}</span></p> -At step 4 and 5, the intermediate dataset is written to the disk by map worker and then read from the disk by reduce worker. Transferring big data chunks over network is expensive, so the data is stored on local disks of the cluster and the master tries to schedule the map task on the machine that contains the dataset or a nearby machine to minimize the network operation. +At step 4 and 5, the intermediate dataset is written to the disk by map worker and then read from the disk by reducing worker. Transferring big data chunks over the network is expensive, so the data is stored on local disks of the cluster and the master tries to schedule the map task on the machine that contains the dataset or a nearby machine to minimize the network operation. ### 2.2 Spark execution model @@ -545,39 +545,39 @@ At step 4 and 5, the intermediate dataset is written to the disk by map worker a <figure class="main-container"> <img src="./cluster-overview.png" alt="MapReduce Execution Overview" /> </figure> +*Figure & information (this section) from the website: http://spark.apache.org/docs/latest/cluster-overview.html* The Spark driver defines SparkContext which is the entry point for any job that defines the environment/configuration and the dependencies of the submitted job. It connects to the cluster manager and requests resources for further execution of the jobs. -The cluster manager manages and allocates the required system resources to the Spark jobs. Furthermore, it coordinates and keeps track of the live/dead nodes in a cluster. It enables the execution of jobs submitted by the driver on the worker nodes (also called Spark workers) and finally tracks and shows the status of various jobs running by the worker nodes. +The cluster manager manages and allocates the required system resources to the Spark jobs. Furthermore, it coordinates and keeps track of the live/dead nodes in a cluster. It enables the execution of jobs submitted by the driver on the worker nodes (also called Spark workers) and finally tracks and shows the status of various jobs running on the worker nodes. A Spark worker executes the business logic submitted by the user by way of the Spark driver. Spark workers are abstracted and are allocated dynamically by the cluster manager to the Spark driver for the execution of submitted jobs. The driver will listen for and accept incoming connections from its executors throughout its lifetime. -***Job scheduler optimization :*** Spark’s job scheduler tracks the persistent RDD’s saved in memory. When an action (count or collect) is performed on a RDD, the scheduler first analyzes the lineage graph to build a DAG of stages to execute. These stages only contain the transformations having narrow dependencies. Outside these stages are the wider dependencies for which the scheduler has to fetch the missing partitions from other workers in order to build the target RDD. The job scheduler is highly performant. It assigns tasks to machines based on data locality or to the preferred machines in the contained RDD. If a task fails, the scheduler re-runs it on another node and also recomputes the stage’s parent is missing. +***Job scheduler optimization:*** Spark’s job scheduler tracks the persistent RDD’s saved in memory. When an action (count or collect) is performed on an RDD, the scheduler first analyzes the lineage graph to build a DAG of stages to execute. These stages only contain the transformations having narrow dependencies. Outside these stages are the wider dependencies for which the scheduler has to fetch the missing partitions from other workers in order to build the target RDD. The job scheduler is highly performant. It assigns tasks to machines based on data locality or to the preferred machines in the contained RDD. If a task fails, the scheduler re-runs it on another node and also recomputes the stage’s parent is missing. ***How are persistent RDD’s memory managed ?*** -Persistent RDDs are stored in memory as java objects (for performance) or in memory as serialized data (for less memory usage at cost of performance) or on disk. If the worker runs out of memory upon creation of a new RDD, Least Recently Used(LRU) policy is applied to evict the least recently accessed RDD unless its same as the new RDD. In that case, the old RDD is excluded from eviction given the fact that it may be reused again in future. Long lineage chains involving wide dependencies are checkpointed to reduce the time in recovering a RDD. However, since RDDs are read-only, checkpointing is still ok since consistency is not a concern and there is no overhead to manage the consistency as is seen in distributed shared memory. +Persistent RDDs are stored in memory as java objects (for performance) or in memory as serialized data (for less memory usage at cost of performance) or on disk. If the worker runs out of memory upon creation of a new RDD, Least Recently Used(LRU) policy is applied to evict the least recently accessed RDD unless its same as the new RDD. In that case, the old RDD is excluded from eviction given the fact that it may be reused again in future. Long lineage chains involving wide dependencies are checkpointed to reduce the time in recovering an RDD. However, since RDDs are read-only, checkpointing is still ok since consistency is not a concern and there is no overhead to manage the consistency as is seen in distributed shared memory. +### 2.3 Hive execution model -### 2.3 Hive execution model +The Hive execution model {% cite thusoo2010hive --file big-data%} composes of the below important components (and as shown in the below Hive architecutre diagram below): -The Hive execution model composes of the below important components (and as shown in the below diagram): +- Driver: Similar to the Drivers of Spark/Map reduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics. -- Driver : Similar to the Drivers of Spark/Map reduce application, the driver in Hive handles query submission & its flow across the system. It also manages the session and its statistics. +- Metastore – A Hive meta store stores all information about the tables, their partitions, schemas, columns and their types, etc. enabling transparency of data format and its storage to the users. It, in turn, helps in data exploration, query compilation, and optimization. Criticality of the Matastore for managing the structure of Hadoop files requires it to be updated on a regular basis. -- Metastore – A Hive metastore stores all information about the tables, their partitions, schemas, columns and their types, etc. enabling transparency of data format and its storage to the users. It in turn helps in data exploration, query compilation and optimization. Criticality of the Matastore for managing the structure of hadoop files requires it to be updated on a regular basis. +- Query Compiler – The Hive Query compiler is similar to any traditional database compilers. it processes the query in three steps: + - Parse: In this phase, it uses Antlr (A parser generator tool) to generate the Abstract syntax tree (AST) of the query. + - Transformation of AST to DAG (Directed acyclic graph): In this phase, it generates a logical plan and does a compile type checking. The logical plan is generated using the metadata (stored in Metastore) information of the required tables. It can flag errors if any issues found during the type checking. -- Query Compiler – The Hive Query compiler is similar to any traditional database compilers. it processes the query in three steps : - - Parse : In this phase it uses Antlr (A parser generator tool) to generate the Abstract syntax tree (AST) of the query. - - Transformation of AST to DAG (Directed acyclic graph) : In this phase it generates logical plan and does a compile type checking. Logical plan is generated using the metadata (stored in Metastore) information of the required tables. It can flag errors if any issues found during the type checking. + - Optimization: Optimization forms the core of any declarative interface. In the case of Hive, optimization happens through chains of transformation of DAG. A transformation could include even a user defined optimization and it applies an action on the DAG only if a rule is satisfied. Every node in the DAG implements a special interface called as Node interface which makes it easy for the manipulation of the operator DAG using other interfaces like GraphWalker, Dispatcher, Rule, and Processor. Hence, by transformation, we mean walking through a DAG and for every Node we encounter we perform a Rule satisfiability check. If a Rule is satisfied, a corresponding processor is invoked. A Dispatcher maintains a list of Rule to Processor mappings. - - Optimization : Optimization forms the core of any declarative interface. In case of Hive, optimization happens through chains of transformation of DAG. A transformation could include even a user defined optimization and it applies an action on the DAG only if a rule is satisfied. Every node in the DAG implements a special interface called as Node interface which makes it easy for the manipulation of the operator DAG using other interfaces like GraphWalker, Dispatcher, Rule and Processor. Hence, by transformation, we mean walking through a DAG and for every Node we encounter we perform a Rule satisfiability check. If a Rule is satisfied, a corresponding processor is invoked. A Dispatcher maintains a list of Rule to Processor mappings. - -<figure class="main-container" align="center"> - <img src="./Hive-transformation.png" alt="Hive transformation" /> -</figure> + <figure class="main-container" align="center"> + <img src="./Hive-transformation.png" alt="Hive transformation" /> + </figure> *Figure to depict the transformation flow during optimization, from:* {%cite thusoo2010hive --file big-data %} -- Execution Engine : Execution Engine finally executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query). +- Execution Engine: Execution Engine finally executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.XML file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query). ***Summarizing the flow*** @@ -588,18 +588,19 @@ The Hive execution model composes of the below important components (and as show </figure> -The query is first submitted via CLI/web UI/any other interface. The query undergoes all the compiler phases as explained above to form an optimized DAG of MapReduce and hdfs tasks which the execution engine executes in its correct order using Hadoop. +The query is first submitted via CLI/the web UI/any other interface. The query undergoes all the compiler phases as explained above to form an optimized DAG of MapReduce and his tasks which the execution engine executes in its correct order using Hadoop. -Some of the important opimization techniques in Hive are : +Some of the important optimization techniques in Hive are: - Column Pruning - Consider only the required columns needed in the query processing for projection. - - Predicate Pushdown - Filter the rows as early as possible by pushing down the predicates. Its important that unnecessary records are filtered first and transformations are applied on only the needed ones. + - Predicate Pushdown - Filter the rows as early as possible by pushing down the predicates. It is important that unnecessary records are filtered first and transformations are applied to only the needed ones. - Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate. - Map Side Joins - Smaller tables in the join operation can be replicated in all the mappers and the reducers. - - Join Reordering - Reduce reducer side join operation memory by keeping only smaller tables in memory. Larger tables need not be kept in memory. + - Join Reordering - Reduce "reducer side" join operation memory by keeping only smaller tables in memory. Larger tables need not be kept in memory. - Repartitioning data to handle skew in GROUP BY processing can be achieved by performing GROUP BY in two MapReduce stages. In first stage data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers. - - Similar to combiners in Map reduce, hash based partial aggregations in the mappers can be performed reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data. + - Similar to combiners in Map reduce, hash based partial aggregations in the mappers can be performed to reduce the data that is sent by the mappers to the reducers. This helps in reducing the amount of time spent in sorting and merging the resulting data. + |
