package com.zkpk.us;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;class UserMapper extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { IntWritable one = new IntWritable(1); String[] columns = value.toString().split("\t"); if (columns != null && columns.length == 6) { Text uid = new Text(columns[1]); context.write(uid, one); } };}class UserReducer extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce( Text key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ sum += value.get(); } context.write(key, new IntWritable(sum)); };}public class UserCount { /** * @param args */ public static void main(String[] args)throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "UserUid"); job.setJarByClass(UserCount.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(UserMapper.class); job.setReducerClass(UserReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); }}
3 回答

趙強老師
TA貢獻10條經驗 獲得超14個贊
這是MapReduce程序,是Hadoop處理數據的核心。程序有三部分組成:
第一部分:Map。表示將數據進行分詞處理,分隔符是制表鍵
第二部分:Reduce。將Map的輸出進行匯總,得到最后的輸出。
第三部分:主程序。將Map和Reduce組成一個任務job,來執行,數據的輸入和輸出都來至于HDFS。
有問題,可以再問我。呵呵
- 3 回答
- 0 關注
- 1433 瀏覽
添加回答
舉報
0/150
提交
取消