Question: Page Rank algorithm Hadoop implementation Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look

Page Rank algorithm Hadoop implementation

Add the final MapReduce stage so the whole PageRank calculation job can be submitted and run correctly. In HadoopPageRank.java, look for the string place holder:. This is where you should be adding code to start the final MR stage. For the final MR handling, you will need to add two new classes that have names HadoopPageRankResultMapper.java and HadoopPageRankResultReducer.java.

--------------------------------------------------------------

HadoopPageRank.java

import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class HadoopPageRank extends Configured implements Tool { public int run(String[] args) throws Exception { // house keeping int iteration = 0; Path inputPath = new Path(args[0]); Path basePath = new Path(args[1]); FileSystem fs = FileSystem.get(getConf()); fs.delete(basePath, true); fs.mkdirs(basePath); // configure initial job Job initJob = Job.getInstance(getConf(),"pageRank"); initJob.setJarByClass(HadoopPageRank.class); initJob.setMapperClass(HadoopPageRankInitMapper.class); initJob.setReducerClass(HadoopPageRankInitReducer.class); initJob.setOutputKeyClass(Text.class); initJob.setOutputValueClass(Text.class); initJob.setInputFormatClass(TextInputFormat.class); Path outputPath = new Path(basePath, "iteration_" + iteration); FileInputFormat.addInputPath(initJob, inputPath); FileOutputFormat.setOutputPath(initJob, outputPath); // let initJob run and wait for finish if ( !initJob.waitForCompletion(true) ) { return -1; } // calculate the page ranks int totalIterations = Integer.parseInt(args[2]); while ( iteration iteration ++; inputPath = outputPath; // new input is the old output outputPath = new Path(basePath, "iteration_" + iteration); Job mainJob = Job.getInstance(getConf(),"Iteration " + iteration); mainJob.setJarByClass(HadoopPageRank.class); mainJob.setMapperClass(HadoopPageRankMainJobMapper.class); mainJob.setReducerClass(HadoopPageRankMainJobReducer.class); mainJob.setOutputKeyClass(Text.class); mainJob.setOutputValueClass(Text.class); mainJob.setInputFormatClass(TextInputFormat.class); FileInputFormat.setInputPaths(mainJob, inputPath); FileOutputFormat.setOutputPath(mainJob, outputPath); if ( !mainJob.waitForCompletion(true) ) { return -1; } } // collect the result, highest rank first - you will need to finish this up Job resultJob = Job.getInstance(getConf(),"final result"); resultJob.setJarByClass(HadoopPageRank.class);

/* * place holder: * here is the place you will need to add a final Map/Reduce code */ FileInputFormat.setInputPaths(resultJob, outputPath); FileOutputFormat.setOutputPath(resultJob,new Path(basePath, "result")); if ( !resultJob.waitForCompletion(true) ) { return -1; } return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HadoopPageRank(), args); System.exit(exitCode); } }

-----------------------------------------------------------------------------------------------

HadoopPageRankInitMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

public class HadoopPageRankInitMapper extends Mapper {,>

@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.charAt(0) == '#' ) { return; } int tabIndex = value.find("\t"); String nodeA = Text.decode(value.getBytes(), 0, tabIndex); String nodeB = Text.decode(value.getBytes(), tabIndex + 1, value.getLength() - (tabIndex + 1)); context.write(new Text(nodeA), new Text(nodeB)); } }

______________________________________________________________________________

HadoopPageRankInitReducer.java

import java.io.IOException;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

public class HadoopPageRankInitReducer extends Reducer {,>

public static final long TOTAL_WEB_PAGES = 4; @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { boolean first = true; String links = (1.0 / TOTAL_WEB_PAGES) + "\t";

for (Text value : values) { if (!first) links += ","; links += value.toString(); first = false; } context.write(key, new Text(links)); } }

_______________________________________

HadoopPageRankMainJobMapper.java

import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HadoopPageRankMainJobMapper extends Mapper {,>

@Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if ( value == null || value.getLength() == 0 ) { return; } int tabIdx1 = value.find("\t"); int tabIdx2 = value.find("\t", tabIdx1 + 1); // extract tokens from the current line String page = Text.decode(value.getBytes(), 0, tabIdx1); String pageRank = Text.decode(value.getBytes(), tabIdx1 + 1, tabIdx2 - (tabIdx1 + 1)); String outlinks = Text.decode(value.getBytes(), tabIdx2 + 1, value.getLength() - (tabIdx2 + 1)); // calculate contribution to each target page String[] allNextPages = outlinks.split(","); if ( allNextPages == null || allNextPages.length == 0 ) { return; } double currentPR = Double.parseDouble(pageRank.toString()); int totalNumOfNextPages = allNextPages.length; for (String nextPage : allNextPages) { Text rankContribution = new Text(currentPR/totalNumOfNextPages + ""); context.write(new Text(nextPage), rankContribution); } // put the original links so the reducer is able to produce the correct output context.write(new Text(page), new Text("|" + outlinks)); } }

______________________________________________________________

HadoopPageRankMainJobReducer.java

import java.io.IOException;

import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

public class HadoopPageRankMainJobReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if ( values == null ) { return; } String links = ""; double receivedContribution = 0.0; for (Text value : values) { String content = value.toString(); if (content.startsWith("|")) { links += content.substring("|".length()); } else { receivedContribution += Double.parseDouble(content); },>

} double newPageRank = receivedContribution; context.write(key, new Text(newPageRank + "\t" + links)); }

}

Step by Step Solution

There are 3 Steps involved in it

1 Expert Approved Answer
Step: 1 Unlock blur-text-image
Question Has Been Solved by an Expert!

Get step-by-step solutions from verified subject matter experts

Step: 2 Unlock
Step: 3 Unlock

Students Have Also Explored These Related Programming Questions!