From de5eb93117c9729df46d0085ef89662f67a62d9c Mon Sep 17 00:00:00 2001 From: msabhi Date: Thu, 15 Dec 2016 02:40:29 -0500 Subject: Added word count for map reduce --- chapter/8/big-data.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) (limited to 'chapter/8') diff --git a/chapter/8/big-data.md b/chapter/8/big-data.md index 511c7dd..915325e 100644 --- a/chapter/8/big-data.md +++ b/chapter/8/big-data.md @@ -69,6 +69,73 @@ MapReduce runs on hundreds or thousands of unreliable commodity machines, so the *Limitations* Many a analytics workloads like K-means, logistic regression, graph processing applications like PageRank, shortest path using parallel breadth first search require multiple stages of map reduce jobs. In regular map reduce framework like Hadoop, this requires the developer to manually handle the iterations in the driver code. At every iteration, the result of each stage T is written to HDFS and loaded back again at stage T+1 causing a performance bottleneck. The reason being wastage of network bandwidth, CPU resources and mainly the disk I/O operations which are inherently slow. In order to address such challenges in iterative workloads on map reduce, frameworks like Haloop {% cite bu2010haloop --file big-data %}, Twister {% cite ekanayake2010twister --file big-data %} and iMapReduce {% cite zhang2012imapreduce --file big-data %} adopt special techniques like caching the data between iterations and keeping the mapper and reducer alive across the iterations. +*Complete code for Word count in Hadoop (Java based implementation of map reduce)* + +``` +import java.io.IOException; +import java.util.*; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class WordCount +{ + public static class Map extends + Mapper + { + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException + { + String line = value.toString(); + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) + { + word.set(tokenizer.nextToken()); + context.write(word, one); + } + } + + public static class Reduce + extends Reducer + { + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException + { + int sum = 0; + for (IntWritable val : values) + { + sum += val.get(); + } + context.write(key, new IntWritable(sum)); + } + } + + public static void main(String[] args) throws Exception + { + Configuration conf = new Configuration(); + Job job = new Job(conf, "wordcount"); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setMapperClass(Map.class); + job.setReducerClass(Reduce.class); + job.setInputFormatClass(TextInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.addInputPath(job, new Path(args[0])); + FileOutputFormat.setOutputPath(job, new Path(args[1])); + } + + job.waitForCompletion(true); +} +``` + + ### 1.1.2 FlumeJava FlumeJava {%cite chambers2010flumejava --file big-data %}was introduced to make it easy to develop, test, and run efficient data-parallel pipelines. FlumeJava represents each dataset as an object and transformation is invoked by applying methods on these objects. It constructs an efficient internal execution plan from a pipeline of MapReduce jobs, uses deferred evaluation and optimizes based on plan structures. The debugging ability allows programmers to run on the local machine first and then deploy to large clusters. -- cgit v1.2.3