经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » MapReduce » 查看文章
云计算实验:Java?MapReduce编程
来源:jb51  时间:2021/12/20 15:28:23  对本文有异议

实验题目:

MapReduce:编程

实验内容:

本实验利用 Hadoop 提供的 Java API 进行编程进行 MapReduce 编程。

实验目标:

  • 掌握MapReduce编程。
  • 理解MapReduce原理

【实验作业】简单流量统计

有如下这样的日志文件:

13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230513 00-FD-07-A4-72-B8:CMCC 120.196.40.8 i02.c.aliimg.com 248 0 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230533 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230543 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Video website 1527 2106 200
13926230553 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13826230563 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13926230573 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 Integrated portal 1938 2910 200
18912688533 00-FD-07-A4-72-B8:CMCC 220.196.100.82 i02.c.aliimg.com 3333 21321 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 Search Engines 9531 9531 200
13826230523 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200
13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 2481 24681 200

该日志文件记录了每个手机用户在一段时间内的网络流量信息,具体字段含义为:

手机号码 MAC地址 IP地址 域名 上行流量(字节数) 下行流量(字节数) 套餐类型
根据以上日志,统计出每个手机用户在该时间段内的总流量(上行流量+下行流量),统计结果的格式为:

手机号码 字节数量

实验结果:

在这里插入图片描述

实验代码:

WcMap.java

  1. import java.io.IOException;
  2. import org.apache.commons.lang.StringUtils;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6.  
  7. public class WcMap extends Mapper<LongWritable, Text, Text, LongWritable>{
  8. @Override
  9. protected void map(LongWritable key, Text value, Context context)
  10. throws IOException, InterruptedException {
  11. String str = value.toString();
  12. String[] words = StringUtils.split(str," ",10);
  13. int i=0;
  14. for(String word : words){
  15. if(i==words.length-2||i==words.length-3)
  16. context.write(new Text(words[0]), new LongWritable(Integer.parseInt(word)));
  17. i++;
  18. }
  19. }
  20. }
  21.  

WcReduce.java

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5.  
  6. public class WcReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
  7. @Override
  8. protected void reduce(Text key, Iterable<LongWritable> values,Context context)
  9. throws IOException, InterruptedException {
  10. long count = 0;
  11. for(LongWritable value : values){
  12. count += value.get();
  13. }
  14. context.write(key, new LongWritable(count));
  15. }
  16. }
  17.  

WcRunner.java

  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.LongWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.util.Scanner;
  10. import org.apache.hadoop.fs.FSDataInputStream;
  11. import org.apache.hadoop.fs.FileSystem;
  12. import java.net.URI;
  13.  
  14. public class WcRunner{
  15. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  16. Configuration conf = new Configuration();
  17. Job job = Job.getInstance(conf);
  18. job.setJarByClass(WcRunner.class);
  19. job.setMapperClass(WcMap.class);
  20. job.setReducerClass(WcReduce.class);
  21. job.setOutputKeyClass(Text.class);
  22. job.setOutputValueClass(LongWritable.class);
  23. job.setMapOutputKeyClass(Text.class);
  24. job.setMapOutputValueClass(LongWritable.class);
  25.  
  26. Scanner sc = new Scanner(System.in);
  27. System.out.print("inputPath:");
  28. String inputPath = sc.next();
  29. System.out.print("outputPath:");
  30. String outputPath = sc.next();
  31.  
  32. try {
  33. FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  34. Path hdfsPath = new Path(outputPath);
  35. fs0.copyFromLocalFile(new Path("/headless/Desktop/workspace/mapreduce/WordCount/data/1.txt"),new Path("/mapreduce/WordCount/input/1.txt"));
  36. if(fs0.delete(hdfsPath,true)){
  37. System.out.println("Directory "+ outputPath +" has been deleted successfully!");
  38. }
  39. }catch(Exception e) {
  40. e.printStackTrace();
  41. }
  42. FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
  43. FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
  44. job.waitForCompletion(true);
  45. try {
  46. FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  47. Path srcPath = new Path(outputPath+"/part-r-00000");
  48.  
  49. FSDataInputStream is = fs.open(srcPath);
  50. System.out.println("Results:");
  51. while(true) {
  52. String line = is.readLine();
  53. if(line == null) {
  54. break;
  55. }
  56. System.out.println(line);
  57. }
  58. is.close();
  59. }catch(Exception e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. }
  64.  

【实验作业】索引倒排输出行号

在索引倒排实验中,我们可以得到每个单词分布在哪些文件中,以及在每个文件中出现的次数,修改以上实现,在输出的倒排索引结果中可以得到每个单词在每个文件中的具体行号信息。输出结果的格式如下:
单词 文件名:行号,文件名:行号,文件名:行号

实验结果:

MapReduce在3.txt的第一行出现了两次所以有两个1

在这里插入图片描述

  1. import java.io.*;
  2. import java.util.StringTokenizer;
  3. import org.apache.hadoop.io.*;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  6.  
  7. public class MyMapper extends Mapper<Object,Text,Text,Text>{
  8. private Text keyInfo = new Text();
  9. private Text valueInfo = new Text();
  10. private FileSplit split;
  11. int num=0;
  12.  
  13. public void map(Object key,Text value,Context context)
  14. throws IOException,InterruptedException{
  15. num++;
  16. split = (FileSplit)context.getInputSplit();
  17. StringTokenizer itr = new StringTokenizer(value.toString());
  18. while(itr.hasMoreTokens()){
  19. keyInfo.set(itr.nextToken()+" "+split.getPath().getName().toString());
  20. valueInfo.set(num+"");
  21. context.write(keyInfo,valueInfo);
  22. }
  23. }
  24. }
  25.  
  26.  
  27.  
  1. import java.io.*;
  2. import org.apache.hadoop.io.*;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4.  
  5. public class MyCombiner extends Reducer<Text,Text,Text,Text>{
  6.  
  7. private Text info = new Text();
  8.  
  9. public void reduce(Text key,Iterable<Text>values,Context context)
  10. throws IOException, InterruptedException{
  11. String sum = "";
  12. for(Text value:values){
  13. sum += value.toString()+" ";
  14. }
  15.  
  16. String record = key.toString();
  17. String[] str = record.split(" ");
  18.  
  19. key.set(str[0]);
  20. info.set(str[1]+":"+sum);
  21. context.write(key,info);
  22. }
  23. }
  24.  
  1. import java.io.IOException;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4.  
  5. public class MyReducer extends Reducer<Text,Text,Text,Text>{
  6. private Text result = new Text();
  7. public void reduce(Text key,Iterable<Text>values,Context context) throws
  8.  
  9. IOException, InterruptedException{
  10. String value =new String();
  11. for(Text value1:values){
  12. value += value1.toString()+" ; ";
  13. }
  14. result.set(value);
  15. context.write(key,result);
  16. }
  17. }
  18.  
  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.util.Scanner;
  9. import org.apache.hadoop.fs.FSDataInputStream;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import java.net.URI;
  12.  
  13. public class MyRunner {
  14. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  15. Configuration conf = new Configuration();
  16.  
  17. Job job = Job.getInstance(conf);
  18.  
  19. job.setJarByClass(MyRunner.class);
  20.  
  21. job.setMapperClass(MyMapper.class);
  22. job.setReducerClass(MyReducer.class);
  23. job.setCombinerClass(MyCombiner.class);
  24.  
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(Text.class);
  27.  
  28.  
  29. job.setMapOutputKeyClass(Text.class);
  30. job.setMapOutputValueClass(Text.class);
  31.  
  32. Scanner sc = new Scanner(System.in);
  33. System.out.print("inputPath:");
  34. String inputPath = sc.next();
  35. System.out.print("outputPath:");
  36. String outputPath = sc.next();
  37.  
  38. try {
  39. FileSystem fs0 = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  40. Path hdfsPath = new Path(outputPath);
  41. if(fs0.delete(hdfsPath,true)){
  42. System.out.println("Directory "+ outputPath +" has been deleted successfully!");
  43. }
  44. }catch(Exception e) {
  45. e.printStackTrace();
  46. }
  47.  
  48. FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000"+inputPath));
  49.  
  50. FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000"+outputPath));
  51.  
  52. job.waitForCompletion(true);
  53.  
  54. try {
  55. FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());
  56. Path srcPath = new Path(outputPath+"/part-r-00000");
  57.  
  58. FSDataInputStream is = fs.open(srcPath);
  59. System.out.println("Results:");
  60. while(true) {
  61. String line = is.readLine();
  62. if(line == null) {
  63. break;
  64. }
  65. System.out.println(line);
  66. }
  67. is.close();
  68. }catch(Exception e) {
  69. e.printStackTrace();
  70. }
  71.  
  72. }
  73. }
  74.  

到此这篇关于云计算实验:Java MapReduce编程的文章就介绍到这了,更多相关Java MapReduce编程内容请搜索w3xue以前的文章或继续浏览下面的相关文章希望大家以后多多支持w3xue!

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号