diff options
| -rw-r--r-- | chapter/8/big-data.md | 130 |
1 files changed, 66 insertions, 64 deletions
diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index a705b33..5203e5a 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -69,69 +69,6 @@ MapReduce runs on hundreds or thousands of unreliable commodity machines, so the *Limitations* Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations. -*Complete code for Word count in Hadoop (Java based implementation of map reduce)* - -```java -import java.io.IOException; -import java.util.*; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; - -public class WordCount -{ - public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> - { - private final static IntWritable one = new IntWritable(1); - private Text word = new Text(); - - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException - { - String line = value.toString(); - StringTokenizer tokenizer = new StringTokenizer(line); - while (tokenizer.hasMoreTokens()) - { - word.set(tokenizer.nextToken()); - context.write(word, one); - } - } - - public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> - { - public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException - { - int sum = 0; - for (IntWritable val : values) - { - sum += val.get(); - } - context.write(key, new IntWritable(sum)); - } - } - - public static void main(String[] args) throws Exception - { - Configuration conf = new Configuration(); - Job job = new Job(conf, "wordcount"); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - job.setMapperClass(Map.class); - job.setReducerClass(Reduce.class); - job.setInputFormatClass(TextInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - FileInputFormat.addInputPath(job, new Path(args[0])); - FileOutputFormat.setOutputPath(job, new Path(args[1])); - } - - job.waitForCompletion(true); -} -``` - ### 1.1.2 FlumeJava FlumeJava {%cite chambers2010flumejava --file big-data %}was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by applying methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters. @@ -280,7 +217,70 @@ RDDs are immutable and hence a straggler (slow node) can be replaced with a back - `Debugging and profiling` : There is no availability of debugging tools and developers find it hard to realize if a computation is happening more on a single machine or if the data-structure they used were inefficient. ### 1.2 Querying: declarative interfaces -MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, map reduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in map-reduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown in Figure X, can span up to 63 lines. +MapReduce takes care of all the processing over a cluster, failure and recovery, data partitioning etc. However, the framework suffers from rigidity with respect to its one-input data format (key/value pair) and two-stage data flow. Several important patterns like equi-joins and theta-joins {% cite okcan2011processing --file big-data%} which could be highly complex depending on the data, require programmers to implement by hand. Hence, map reduce lacks many such high level abstractions requiring programmers to be well versed with several of the design patterns like map-side joins, reduce-side equi-join etc. Also, java based code (like in Hadoop framework) in map-reduce can sometimes become repetitive when the programmer wants to implement most common operations like projection, filtering etc. A simple word count program as shown below, can span up to 63 lines. + +*Complete code for Word count in Hadoop (Java based implementation of map reduce)* + +```java +import java.io.IOException; +import java.util.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class WordCount +{ + public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> + { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException + { + String line = value.toString(); + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) + { + word.set(tokenizer.nextToken()); + context.write(word, one); + } + } + + public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> + { + public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException + { + int sum = 0; + for (IntWritable val : values) + { + sum += val.get(); + } + context.write(key, new IntWritable(sum)); + } + } + + public static void main(String[] args) throws Exception + { + Configuration conf = new Configuration(); + Job job = new Job(conf, "wordcount"); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + } + + job.waitForCompletion(true); +} +``` *Why SQL over MapReduce ?* @@ -367,6 +367,8 @@ FROM ( ) REDUCE word, count USING 'python reduce.py'; ``` +*Example from {% cite thusoo2010hive --file big-data%}* + This query uses mapper.py for transforming inputdata into (word, count) pair, distributes data to reducers by hashing on word column (given by CLUSTER) and uses reduce.py. |
