一.序列化简介
什么是序列化呢?

序列化:对象———》字节序列
反序列化:字节序列——》对象
备注:对象在内存(RAM)当中
字节序列:可以在磁盘(ROM)当中,也可以在网络当中进行传输
序列化的根本缘故:将对象从RAM里的数据 转化成ROM里的数据
二.序列化案例
我们这里将要编写的序列化的程序的流程如下图所示,是一个统计手机耗费总流量的case:

对于这个案例而言。为什么需要进行序列化呢?
因为在第三阶段,我们将手机的上行流量和下行流量都分别封装进了一个对象当中,一个手机号对应两个流量。因此一个bean对象(就是一个普通的对象,拥有方法,属性等)当中具有多个数据,因此需要进行序列化。
三.编写Bean类
现在我们开始封装这个手机的数据,代码如下所示。代码主要是为了封装value,也就是手机的上行流量以及下行流量,这里不处理手机号,不将手机号进行封装。因为手机号在我们的mapper阶段,我们将其视为Key同时注意要想把结果显示在文件中,需要重写toString(),且用"\t"分开,方便后续用。
- import org.apache.hadoop.io.Writable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- //建立每一个手机号所对应的对象
- public class FlowBean implements Writable {
- private long upFlow;//上行流量
- private long downFlow;//下行流量
- private long sumFlow;//总流量
- //空参构造,为了后续能够反射
- public FlowBean()
- {
- super();
- }
- public FlowBean(long upFlow,long downFlow)
- {
- super();
- this.upFlow=upFlow;
- this.downFlow=downFlow;
- sumFlow=upFlow+upFlow;
- }
- //序列化方法,这样这个对象就可以很方便地进行序列化和反序列化了!
- //序列化的方法必须和反序列化相同
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeLong(upFlow);
- dataOutput.writeLong(downFlow);
- dataOutput.writeLong(sumFlow);
- }
- //反序列方法
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- //必须要求和序列化要求顺序一致,顺序一致就可以进行接收
- upFlow=dataInput.readLong();
- downFlow= dataInput.readLong();
- sumFlow=dataInput.readLong();
- }
- @Override
- public String toString() {
- return upFlow + "\t" + downFlow + "\t" + sumFlow;
- }
- public long getUpFlow() {
- return upFlow;
- }
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
- public long getDownFlow() {
- return downFlow;
- }
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
- public long getSumFlow() {
- return sumFlow;
- }
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
- public void set(long upFlow,long downFlow)
- {
- upFlow=upFlow;
- downFlow=downFlow;
- sumFlow=upFlow+downFlow;
- }
- }
在编写这个bean类当中,我们拥有了一个bean类所有的特征,比如get/set方法,以及需要拥有的序列化以及反序列化方法,可以在后续调用这个bean对象的时候,更加方便地进行序列化和反序列化。
四.编写Mapper类
将获得的手机号码设定为key,手机号的上行流量和下行流量分别设定为value,value使用bean类FlowBean来表示。(因为有两个value,而在mapper里面又只能够有一个输出,因此只能使用一个类来代表mapper的输出(valueout)了)
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- Text k=new Text();
- FlowBean v=new FlowBean();
- //1.获取一行,tostring方法已经被改写,因此
- String line=value.toString();
- //2.切割
- String[] fields=line.split("\t");
- //3.封装对象
- k.set(fields[1]);//封装手机号
- //封装
- long upFlow= Long.parseLong(fields[fields.length-3]);
- long downFlow=Long.parseLong(fields[fields.length-2]);
- v.setUpFlow(upFlow);
- v.setUpFlow(downFlow);
- //v.set();
- //4.写出
- context.write(k,v);
- }
- }
五.编写Reducer类
- import org.apache.hadoop.mapreduce.Reducer;
- import javax.xml.soap.Text;
- import java.io.IOException;
- public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
- FlowBean v=new FlowBean();
- @Override
- protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
- long sum_upFlow=0;
- long sum_downflow=0;
- //1.累加求和
- for (FlowBean flowBean:values)
- {
- sum_upFlow+=flowBean.getUpFlow();
- sum_downflow+=flowBean.getDownFlow();
- }
- v.set(sum_upFlow,sum_downflow);
- //2.写出
- context.write(key,v);
- }
- }
六.编写Driver类
编写driver类是一个固定的步骤,可以直接根据注释当中的步骤进行编写即可,代码如下所示:
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import javax.xml.soap.Text;
- import java.io.IOException;
- public class FlowsumDriver {
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration configuration=new Configuration();
- //1.获取job对象
- Job job=Job.getInstance(configuration);
- //2.设置jar路径
- job.setJarByClass(FlowsumDriver.class);
- //3.关联mapper和reducer
- job.setMapperClass(FlowCountMapper.class);
- job.setReducerClass(FlowCountReducer.class);
- //4.设置mapper输出的key和value类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputKeyClass(FlowBean.class);
- //5.设置最终输出的key和value类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
- //6.设置输出路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job,new Path(args[1]));
- //7.提交job
-
- boolean result=job.waitForCompletion(true);
- System.out.println(result);
- }
- }
这样我们就可以完成这个任务了!