Input data file avgtemperature.txt:

The DATE, HOUR, COND, PRES, HUM, TMP, AQI, PM2.5 and PM10 20160602, 00, haze, 1984130,9,390,348,300 20160802,01, haze, 1163,81,8,393,368,302 20160706,02, Haze,1079,108,17,360,394,306 20160706,03, Haze,1116,79,6,339,387,303 20160502,04, Haze,1198,98,16,357,325,307 20160602,05, Haze,1762,126,9,324,316,301 20160408,06, Haze,1996,131,3,349,344,301 20160604,07, Haze,1952,119,26,347,300,309 20160105,08, Haze,1410,81,8,350,395,307 20160104,09, Haze,1718,130,4,352,335,308 20160501,10, Haze,1714,119,27,310,336,307 20160601,11, HAZE,1660,130,23,311,364,302 20160606,12, HAZE,1598,96,12,369,346,309 20160602,13, HAZE,1673,127,2,343,346,303 20160706,14, HAZE,1578,122,8,360,323,307 20160707,15, HAZE,1237,118,12,384,384,301 20160205,16, HAZE,1231,78,9,361,357,302 20160605,17, HAZE,1166,86,30,350,388,307 20160506,18, HAZE,1426,94,2,378,372,305 20160805,19, HAZE,1874,144,20,376,327,302 20160405,20, HAZE,1778,94,22,360,335,304, 20160104,21, HAZE,1055,64,22,376,361,305, 20160304,22, HAZE,1349,78,15,367,384,308 20160203,23, Haze,2004,110,2,359,371,304, 20160603,24, Haze,1375,115,19,308,301,308, 20160402,25, Haze,1201,69,5,387,342,305 20160707,26, Haze,1272,112,23,348,333,307 20160702,27, Haze,1738,60,12, 339,300,303 20160301,28, Haze,1752,107,12,364,331,301 20160704, haze, 1442,65,9,332,369,308

Problem one: compile monthly mean temperature statistical program

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class AvgTemperature { public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> { private IntWritable intValue = new IntWritable(); private Text dateKey = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); String date = items[0]; String tmp = items[5]; if(!" DATE".equals(date) && !" N/A".equals(TMP)){dateKey.set(date.substring(0, 6)); intValue.set(Integer.parseInt(tmp)); context.write(dateKey, intValue); } } } public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int tmp_sum = 0; int count = 0; for(IntWritable val : values){ tmp_sum += val.get(); count++; } int tmp_avg = tmp_sum/count; result.set(tmp_avg); context.write(key, result); } } public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AvgTemperature"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(AvgTemperature.class); job.setMapperClass(StatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(StatReducer.class); job.setPartitionerClass(HashPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); TextInputFormat.setInputPaths(job, args[0]); job.setNumReduceTasks(Integer.parseInt(args[2])); System.exit(job.waitForCompletion(true) ? 0:1); }}

Running results:

201601    11
201602    5
201603    13
201604    10
201605    15
201606    16
201607    12
201608    14

The second question: compile the daily air quality statistical program

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import java.io.IOException; /** * @Author Natasha * @Description * @Date 2020/10/30 20:37 **/ public class AirQuality { public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{ private Text text = new Text(); private IntWritable intWritable = new IntWritable(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] item = value.toString().split(","); String date = item[0]; String kongqi = item[6]; if(!" DATE".equals(date) && !" N/A".equals(KongQi)){// set(date.substring(0, 6)); intWritable.set(Integer.parseInt(kongqi)); context.write(text, intWritable); } } } public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable res = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException { int aqi = 0; int cnt = 0; for(IntWritable iw : value){ aqi += iw.get(); cnt++; } int aqi_avg = aqi/cnt; res.set(aqi_avg); context.write(key, res); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AirQuality"); job.setJarByClass(AirQuality.class); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(AirQualityMapprer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(AirQualityReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(Integer.parseInt(args[2])); TextInputFormat.setInputPaths(job, args[0]); TextOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0:1); }}

Running results:

201601    359
201602    360
201603    365
201604    365
201605    348
201606    342
201607    359
201608    384