经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » MapReduce » 查看文章
【Hadoop】:MapReduce实现序列化
来源:cnblogs  作者:Geeksongs  时间:2021/1/25 10:53:04  对本文有异议

一.序列化简介

什么是序列化呢?

 

 

序列化:对象———》字节序列

反序列化:字节序列——》对象

备注:对象在内存(RAM)当中

字节序列:可以在磁盘(ROM)当中,也可以在网络当中进行传输

序列化的根本缘故:将对象从RAM里的数据 转化成ROM里的数据

二.序列化案例

我们这里将要编写的序列化的程序的流程如下图所示,是一个统计手机耗费总流量的case:

 

 对于这个案例而言。为什么需要进行序列化呢?

因为在第三阶段,我们将手机的上行流量和下行流量都分别封装进了一个对象当中,一个手机号对应两个流量。因此一个bean对象(就是一个普通的对象,拥有方法,属性等)当中具有多个数据,因此需要进行序列化。

三.编写Bean类

现在我们开始封装这个手机的数据,代码如下所示。代码主要是为了封装value,也就是手机的上行流量以及下行流量,这里不处理手机号,不将手机号进行封装。因为手机号在我们的mapper阶段,我们将其视为Key同时注意要想把结果显示在文件中,需要重写toString(),且用"\t"分开,方便后续用。

  1. import org.apache.hadoop.io.Writable;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. //建立每一个手机号所对应的对象
  6. public class FlowBean implements Writable {
  7. private long upFlow;//上行流量
  8. private long downFlow;//下行流量
  9. private long sumFlow;//总流量
  10. //空参构造,为了后续能够反射
  11. public FlowBean()
  12. {
  13. super();
  14. }
  15. public FlowBean(long upFlow,long downFlow)
  16. {
  17. super();
  18. this.upFlow=upFlow;
  19. this.downFlow=downFlow;
  20. sumFlow=upFlow+upFlow;
  21. }
  22. //序列化方法,这样这个对象就可以很方便地进行序列化和反序列化了!
  23. //序列化的方法必须和反序列化相同
  24. @Override
  25. public void write(DataOutput dataOutput) throws IOException {
  26. dataOutput.writeLong(upFlow);
  27. dataOutput.writeLong(downFlow);
  28. dataOutput.writeLong(sumFlow);
  29. }
  30. //反序列方法
  31. @Override
  32. public void readFields(DataInput dataInput) throws IOException {
  33. //必须要求和序列化要求顺序一致,顺序一致就可以进行接收
  34. upFlow=dataInput.readLong();
  35. downFlow= dataInput.readLong();
  36. sumFlow=dataInput.readLong();
  37. }
  38. @Override
  39. public String toString() {
  40. return upFlow + "\t" + downFlow + "\t" + sumFlow;
  41. }
  42. public long getUpFlow() {
  43. return upFlow;
  44. }
  45. public void setUpFlow(long upFlow) {
  46. this.upFlow = upFlow;
  47. }
  48. public long getDownFlow() {
  49. return downFlow;
  50. }
  51. public void setDownFlow(long downFlow) {
  52. this.downFlow = downFlow;
  53. }
  54. public long getSumFlow() {
  55. return sumFlow;
  56. }
  57. public void setSumFlow(long sumFlow) {
  58. this.sumFlow = sumFlow;
  59. }
  60. public void set(long upFlow,long downFlow)
  61. {
  62. upFlow=upFlow;
  63. downFlow=downFlow;
  64. sumFlow=upFlow+downFlow;
  65. }
  66. }

在编写这个bean类当中,我们拥有了一个bean类所有的特征,比如get/set方法,以及需要拥有的序列化以及反序列化方法,可以在后续调用这个bean对象的时候,更加方便地进行序列化和反序列化。

四.编写Mapper类

将获得的手机号码设定为key,手机号的上行流量和下行流量分别设定为value,value使用bean类FlowBean来表示。(因为有两个value,而在mapper里面又只能够有一个输出,因此只能使用一个类来代表mapper的输出(valueout)了)

  1. import org.apache.hadoop.io.LongWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Mapper;
  4. import java.io.IOException;
  5. public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
  6. @Override
  7. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  8. Text k=new Text();
  9. FlowBean v=new FlowBean();
  10. //1.获取一行,tostring方法已经被改写,因此
  11. String line=value.toString();
  12. //2.切割
  13. String[] fields=line.split("\t");
  14. //3.封装对象
  15. k.set(fields[1]);//封装手机号
  16. //封装
  17. long upFlow= Long.parseLong(fields[fields.length-3]);
  18. long downFlow=Long.parseLong(fields[fields.length-2]);
  19. v.setUpFlow(upFlow);
  20. v.setUpFlow(downFlow);
  21. //v.set();
  22. //4.写出
  23. context.write(k,v);
  24. }
  25. }

五.编写Reducer类

  1. import org.apache.hadoop.mapreduce.Reducer;
  2. import javax.xml.soap.Text;
  3. import java.io.IOException;
  4. public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
  5. FlowBean v=new FlowBean();
  6. @Override
  7. protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
  8. long sum_upFlow=0;
  9. long sum_downflow=0;
  10. //1.累加求和
  11. for (FlowBean flowBean:values)
  12. {
  13. sum_upFlow+=flowBean.getUpFlow();
  14. sum_downflow+=flowBean.getDownFlow();
  15. }
  16. v.set(sum_upFlow,sum_downflow);
  17. //2.写出
  18. context.write(key,v);
  19. }
  20. }

六.编写Driver类

编写driver类是一个固定的步骤,可以直接根据注释当中的步骤进行编写即可,代码如下所示:

  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.mapreduce.Job;
  4. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  5. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  6. import javax.xml.soap.Text;
  7. import java.io.IOException;
  8. public class FlowsumDriver {
  9. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  10. Configuration configuration=new Configuration();
  11. //1.获取job对象
  12. Job job=Job.getInstance(configuration);
  13. //2.设置jar路径
  14. job.setJarByClass(FlowsumDriver.class);
  15. //3.关联mapper和reducer
  16. job.setMapperClass(FlowCountMapper.class);
  17. job.setReducerClass(FlowCountReducer.class);
  18. //4.设置mapper输出的key和value类型
  19. job.setMapOutputKeyClass(Text.class);
  20. job.setMapOutputKeyClass(FlowBean.class);
  21. //5.设置最终输出的key和value类型
  22. job.setOutputKeyClass(Text.class);
  23. job.setOutputValueClass(FlowBean.class);
  24. //6.设置输出路径
  25. FileInputFormat.setInputPaths(job, new Path(args[0]));
  26. FileOutputFormat.setOutputPath(job,new Path(args[1]));
  27. //7.提交job
  28.  
  29. boolean result=job.waitForCompletion(true);
  30. System.out.println(result);
  31. }
  32. }

这样我们就可以完成这个任务了!

 

原文链接:http://www.cnblogs.com/geeksongs/p/14286807.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号