Please answer the question and meet all of the requirements! Please provide the
ID: 3745981 • Letter: P
Question
Please answer the question and meet all of the requirements! Please provide the source code that you used! I have attached the Java code, which I was provided, at the end of this question. Here is the question:
Modify the code for the weather data (discussed in the class), instead of calculating the maximum temperature for 1901 – 1905 (for station 029070), calculate the average temperature over the year, for each year. In addition to changing the code to calculate the average temperature over the year, you are also asked to add some logging statements. Logging statements are important since they provide the status of the job and other critical information. You will start by adding the following: import org.apache.log4j.Logger; The specific logging information added is up to you, but they should at least include the following:
the name of the job how many reducer tasks are used by Hadoop framework for your submitted job what is the partitioner class that is being used for your submitted job (hint: something like job.getPartitionerClass().getName())
You will need to submit the following:
1. your source code (40 points) 2. a screen shot similar to slide 46 (BDP-mapReduce-2), showing the command you use to start the job (as seen in the white box) (5 points) 3. a screen shot showing the added logging information and the successful completion of the job, similar to slide 47 (BDP-mapReduce-2), but with the added logging information (10 points) 4. a screen shot similar to slide 48 (BDP-mapReduce-2), showing the result of your submitted job (5 points) Modify the code for the weather data (discussed in the class), instead of calculating the maximum temperature for 1901 – 1905 (for station 029070), calculate the average temperature over the year, for each year. In addition to changing the code to calculate the average temperature over the year, you are also asked to add some logging statements. Logging statements are important since they provide the status of the job and other critical information. You will start by adding the following: import org.apache.log4j.Logger; The specific logging information added is up to you, but they should at least include the following:
the name of the job how many reducer tasks are used by Hadoop framework for your submitted job what is the partitioner class that is being used for your submitted job (hint: something like job.getPartitionerClass().getName())
You will need to submit the following:
1. your source code (40 points) 2. a screen shot similar to slide 46 (BDP-mapReduce-2), showing the command you use to start the job (as seen in the white box) (5 points) 3. a screen shot showing the added logging information and the successful completion of the job, similar to slide 47 (BDP-mapReduce-2), but with the added logging information (10 points) 4. a screen shot similar to slide 48 (BDP-mapReduce-2), showing the result of your submitted job (5 points)
Here is the provided Java code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HadoopWeatherMaxTemperature { public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "weather example"); job.setJarByClass(HadoopWeatherMaxTemperature.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HadoopWeatherMaxTemperature { public static class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final int MISSING = 9999; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } public static class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "weather example"); job.setJarByClass(HadoopWeatherMaxTemperature.class); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Question 1 (60 points) Modify the code for the weather data (discussed in the class), instead of calculating the maximum temperature for 1901 - 1905 (for station 029070), calculate the average temperature over the year, for each year. You can download the datasets as described by the course PPT slides In addition to changing the code to calculate the average temperature over the year, you are also asked to add some logging statements. Logging statements are important since they provide the status of the job and other critical information. You will start by adding the following: import org.apache.log4j.Logger The specific logging information added is up to you, but they should at least include the following, the name of the jolb how many reducer tasks are used by Hadoop framework for your submitted job what is the partitioner class that is being used for your submitted job (hint: something like job.getPartitionerClass () .getName)
Explanation / Answer
Hi!
Please check the modified code.
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
public class HadoopWeatherAvgTemperature {
public static class AvgTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
final static Logger log = Logger.getLogger(HadoopWeatherAvgTemperature.class);
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
public static class AvgTemperatureReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
double avgValue = 0;
int count=0;
int sum=0;
for (IntWritable value : values) {
sum=sum+value.get();
count++;
}
avgValue=sum/count;
context.write(key, new IntWritable(avgValue));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
log.debug("Debug Log message Started");
Job job = Job.getInstance(conf, "weather example");
job.setJobName("Job 1");
log.debug("Job Name"+job.getJobName());
job.setJarByClass(HadoopWeatherAvgTemperature.class);
job.setMapperClass(AvgTemperatureMapper.class);
job.setReducerClass(AvgTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
log.debug("Partitioner Class Name"+ job.getPartitionerClass().getName()));
log.debug("No Of Jobs"+ job.getCounters() );
log.debug("The progress of the job's reduce-tasks"+ job.reduceProgress() );
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Thanks