经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
【Hadoop】:手动实现WordCount案例
来源:cnblogs  作者:Geeksongs  时间:2021/1/18 16:03:56  对本文有异议

一.实现案例

实现WorldCount的流程如下:

备注:其中输入的数据是一个txt文件,里面有各种单词,每一行中用空格进行空行

 

一.Mapper的编写

我们在IDEA是使用“ctrl+alt+鼠标左键点击”的方式来查看源码,我们首先查看mapper 类的源码,同时源码我已经使用了,如下所示:

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5.  
  6. package org.apache.hadoop.mapreduce;
  7. import java.io.IOException;
  8. import org.apache.hadoop.classification.InterfaceAudience.Public;
  9. import org.apache.hadoop.classification.InterfaceStability.Stable;
  10. @Public
  11. @Stable
  12. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  13. public Mapper() {
  14. }

  15. //在任务开始之前,setup必然被调用一次
  16. protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  17. }

  18. //在input split的时候,对每一个key/value的pair都call once.大多数程序都会overide这个方法
  19. protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  20. context.write(key, value);
  21. }
  22. //在at the end of the task,这个方法被调用一次
  23. protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  24. }
  25. //把整个程序,里面的所有方法串连起来
  26. public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  27. this.setup(context);
  28. try {
  29. while(context.nextKeyValue()) {//每次仅读取一行数据
  30. this.map(context.getCurrentKey(), context.getCurrentValue(), context);
  31. }
  32. } finally {
  33. this.cleanup(context);
  34. }
  35. }

  36. //上下文,封装了程序当中大量的分析方法
  37. public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  38. public Context() {
  39. }
  40. }
  41. }

因此我们根据里面的源码,编写wordcount所需要的mapper的代码,如下所示:

  1. //现在我们开始编写wordcount的示例
  2. public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
  3. //mapper后面的参数:
  4. // 1.输入数据的key类型
  5. // 2.输入数据的value类型
  6. // 3.输出数据的key类型
  7. // 4.输出数据的value的类型
  8.  
  9. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  10. //1.首先获取一行
  11. String line=value.toString();
  12. //2.将获取后的单词进行分割,按照空格进行分割
  13. String[] words=line.split(" ");
  14. //3.循环输出(不是输出到控制台上面,是输出到reducer里进行处理)
  15. for(String word:words)
  16. {
  17. Text k=new Text();//定义我们输出的类型,肯定是Text,和整个类extends的顺序对应
  18. k.set(word);
  19. IntWritable v=new IntWritable();
  20. v.set(1);//将value设置为1
  21. context.write(k,v);
  22. }
  23. }
  24. }

 

二.Reducer的编写

reducer的源码如下,和mapper的源码非常相似,其实也就是对reducer的方法进行了封装,并没有方法体:

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import org.apache.hadoop.classification.InterfaceAudience.Public;
  4. import org.apache.hadoop.classification.InterfaceStability.Stable;
  5. import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
  6. import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
  7. @Checkpointable
  8. @Public
  9. @Stable
  10. public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  11. public Reducer() {
  12. }
  13. protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  14. }
  15. protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  16. Iterator i$ = values.iterator();
  17. while(i$.hasNext()) {
  18. VALUEIN value = i$.next();
  19. context.write(key, value);
  20. }
  21. }
  22. protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  23. }
  24. public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
  25. this.setup(context);
  26. try {
  27. while(context.nextKey()) {
  28. this.reduce(context.getCurrentKey(), context.getValues(), context);
  29. Iterator<VALUEIN> iter = context.getValues().iterator();
  30. if (iter instanceof ValueIterator) {
  31. ((ValueIterator)iter).resetBackupStore();
  32. }
  33. }
  34. } finally {
  35. this.cleanup(context);
  36. }
  37. }
  38. public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  39. public Context() {
  40. }
  41. }
  42. }

代码如下:

  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.mapreduce.Reducer;
  3. import javax.xml.soap.Text;
  4. import java.io.IOException;
  5. public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
  6. @Override
  7. protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  8. super.reduce(key, values, context);
  9. //在reduce里拿到的是mapper已经map好的数据
  10. //现在数据的形式是这样的:
  11. //atguigu(key),1(value)
  12. //atguigu(key),1(value)
  13.  
  14. int sum=0;
  15. //累计求和
  16. for(IntWritable value: values)
  17. {
  18. sum+=value.get();//将intwrite对象转化为int对象
  19. }
  20. IntWritable v=new IntWritable();
  21. v.set(sum);
  22. //2.写出 atguigu 2
  23. context.write(key,v);
  24. //总结,这个程序看起来并没有起到分开不同单词,并对同一单词的value进行相加的作用啊
  25. //唯一的功能则是统计仅有一个单词的字符之和,这有啥用......
  26. }
  27. }

三.Driver程序编写,让mapreduce动起来!

代码如下:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Job;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. public class wordcoundDriver {
  8. //将mapper和reducer进行启动的类
  9. //driver是完全格式固定的
  10. public static void main(String[] args) throws Exception {
  11. Configuration conf=new Configuration();
  12. //1.获取Job对象
  13. Job job=Job.getInstance(conf);
  14. //2.设置jar储存位置
  15. job.setJarByClass(wordcoundDriver.class);
  16. //3.关联map和reduce类
  17. job.setMapperClass(WordcountMapper.class);
  18. job.setReducerClass(WordCountReducer.class);
  19. //4.设置mapper阶段输出数据的key和value类型
  20. job.setMapOutputKeyClass(Text.class);
  21. job.setMapOutputValueClass(IntWritable.class);
  22. //5.设置最终数据输出的key和value类型
  23. job.setOutputKeyClass(Text.class);
  24. job.setOutputValueClass(IntWritable.class);
  25. //6.设置输入路径和输出路径
  26. FileInputFormat.setInputPaths(job,new Path(args[0]));
  27. FileInputFormat.setInputPaths(job,new Path(args[1]));
  28. //7.提交Job
  29. job.submit();
  30. job.waitForCompletion(true);
  31. }
  32. }

这样就可以运行起来了!大家可以尝试在分布式集群上实现wordcount统计这个功能,只需要将这些代码进行打成jar包,这样就可以放到linux操作系统上去运行了!最后运行的时候,路径写的是HDFS上的路径哦!

原文链接:http://www.cnblogs.com/geeksongs/p/14261619.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号