经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » MapReduce » 查看文章
03_MapReduce框架原理_3.4 InputSplit 切片类(源码)
来源:cnblogs  作者:希望有一天你能发现我  时间:2021/12/24 8:46:57  对本文有异议
Hadoop
2. InputSplit 切片类 1.0 类的作用 InputSplit 他在逻辑上包含了提供给处理这个Inputsplit的Mapper的所有的key-value 1.1 抽象方法 1. public abstract long getLength() 2. public abstract String[] getLocations() 1. 功能说明 获取 InputSplit对象的大小(Bytes) 支持根据 InputSplit 的size 来排序 1. 功能说明 获取 该切片 存储节点的位置信息 1.2 FileSplit 实现类 1. 成员属性 1. private Path file 2. private long start 3. private long length 4. private String[] hosts 该切片 所属文件的路径 切片起始位置 切片长度 存储切片的hosts 1.3 CombineFileSplit 实现类 为每个MapTask 提供一个InputSplit对象,包含了 这个MapTask要处理的数据
点击查看InputSplit
  1. // 切片类,表示 一份被Mapper处理的数据
  2. public abstract class InputSplit {
  3. // 获取切片对象的 长度(单位Bytes)
  4. public abstract long getLength() throws IOException, InterruptedException;
  5. // 获取当前切片对象的 存储信息
  6. public abstract
  7. String[] getLocations() throws IOException, InterruptedException;
  8. // 获取所有切片对象的 存储信息
  9. public SplitLocationInfo[] getLocationInfo() throws IOException {
  10. return null;
  11. }
  12. }

FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,
那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。
在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,
从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

点击查看FileSplit
  1. // 切片类,表示 一份被Mapper处理的数据
  2. // 作为 InputFormat的getSplits方法的返回值
  3. // 作为 InputFormat的createRecordReader方法的输入
  4. // 每个切片 包含文件的一部分 或者整个文件(不可切分或者 文件大小小于切片*1.1时)
  5. public class FileSplit extends InputSplit implements Writable {
  6. private Path file; // 切片 所属的文件名称
  7. private long start; // 切片对应 在文件中的 启示位置
  8. private long length; // 切片长度(字节数)
  9. private String[] hosts; // 切片 所属 block的存储host信息
  10. private SplitLocationInfo[] hostInfos;
  11. // 构造器
  12. public FileSplit() {}
  13. // 构造器
  14. public FileSplit(Path file, long start, long length, String[] hosts) {
  15. this.file = file;
  16. this.start = start;
  17. this.length = length;
  18. this.hosts = hosts;
  19. }
  20. // 构造器
  21. public FileSplit(Path file, long start, long length, String[] hosts,
  22. String[] inMemoryHosts) {
  23. this(file, start, length, hosts);
  24. hostInfos = new SplitLocationInfo[hosts.length];
  25. for (int i = 0; i < hosts.length; i++) {
  26. // because N will be tiny, scanning is probably faster than a HashSet
  27. boolean inMemory = false;
  28. for (String inMemoryHost : inMemoryHosts) {
  29. if (inMemoryHost.equals(hosts[i])) {
  30. inMemory = true;
  31. break;
  32. }
  33. }
  34. hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
  35. }
  36. }
  37. /** The file containing this split's data. */
  38. public Path getPath() { return file; }
  39. /** The position of the first byte in the file to process. */
  40. public long getStart() { return start; }
  41. /** The number of bytes in the file to process. */
  42. @Override
  43. public long getLength() { return length; }
  44. @Override
  45. public String toString() { return file + ":" + start + "+" + length; }
  46. ////////////////////////////////////////////
  47. // Writable methods 序列化方法
  48. ////////////////////////////////////////////
  49. @Override
  50. public void write(DataOutput out) throws IOException {
  51. Text.writeString(out, file.toString());
  52. out.writeLong(start);
  53. out.writeLong(length);
  54. }
  55. @Override
  56. public void readFields(DataInput in) throws IOException {
  57. file = new Path(Text.readString(in));
  58. start = in.readLong();
  59. length = in.readLong();
  60. hosts = null;
  61. }
  62. @Override
  63. public String[] getLocations() throws IOException {
  64. if (this.hosts == null) {
  65. return new String[]{};
  66. } else {
  67. return this.hosts;
  68. }
  69. }
  70. @Override
  71. @Evolving
  72. public SplitLocationInfo[] getLocationInfo() throws IOException {
  73. return hostInfos;
  74. }
  75. }

CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。
可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,
分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。
需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

点击查看CombineFileSplit
  1. // 切片类,表示 一份被Mapper处理的数据
  2. // 一个切片对象,可以包含多个文件
  3. public class CombineFileSplit extends InputSplit implements Writable {
  4. private Path[] paths;
  5. private long[] startoffset;
  6. private long[] lengths;
  7. private String[] locations;
  8. private long totLength;
  9. /**
  10. * default constructor
  11. */
  12. public CombineFileSplit() {}
  13. public CombineFileSplit(Path[] files, long[] start,
  14. long[] lengths, String[] locations) {
  15. initSplit(files, start, lengths, locations);
  16. }
  17. public CombineFileSplit(Path[] files, long[] lengths) {
  18. long[] startoffset = new long[files.length];
  19. for (int i = 0; i < startoffset.length; i++) {
  20. startoffset[i] = 0;
  21. }
  22. String[] locations = new String[files.length];
  23. for (int i = 0; i < locations.length; i++) {
  24. locations[i] = "";
  25. }
  26. initSplit(files, startoffset, lengths, locations);
  27. }
  28. private void initSplit(Path[] files, long[] start,
  29. long[] lengths, String[] locations) {
  30. this.startoffset = start;
  31. this.lengths = lengths;
  32. this.paths = files;
  33. this.totLength = 0;
  34. this.locations = locations;
  35. for(long length : lengths) {
  36. totLength += length;
  37. }
  38. }
  39. /**
  40. * Copy constructor
  41. */
  42. public CombineFileSplit(CombineFileSplit old) throws IOException {
  43. this(old.getPaths(), old.getStartOffsets(),
  44. old.getLengths(), old.getLocations());
  45. }
  46. public long getLength() {
  47. return totLength;
  48. }
  49. /** Returns an array containing the start offsets of the files in the split*/
  50. public long[] getStartOffsets() {
  51. return startoffset;
  52. }
  53. /** Returns an array containing the lengths of the files in the split*/
  54. public long[] getLengths() {
  55. return lengths;
  56. }
  57. /** Returns the start offset of the i<sup>th</sup> Path */
  58. public long getOffset(int i) {
  59. return startoffset[i];
  60. }
  61. /** Returns the length of the i<sup>th</sup> Path */
  62. public long getLength(int i) {
  63. return lengths[i];
  64. }
  65. /** Returns the number of Paths in the split */
  66. public int getNumPaths() {
  67. return paths.length;
  68. }
  69. /** Returns the i<sup>th</sup> Path */
  70. public Path getPath(int i) {
  71. return paths[i];
  72. }
  73. /** Returns all the Paths in the split */
  74. public Path[] getPaths() {
  75. return paths;
  76. }
  77. /** Returns all the Paths where this input-split resides */
  78. public String[] getLocations() throws IOException {
  79. return locations;
  80. }
  81. public void readFields(DataInput in) throws IOException {
  82. totLength = in.readLong();
  83. int arrLength = in.readInt();
  84. lengths = new long[arrLength];
  85. for(int i=0; i<arrLength;i++) {
  86. lengths[i] = in.readLong();
  87. }
  88. int filesLength = in.readInt();
  89. paths = new Path[filesLength];
  90. for(int i=0; i<filesLength;i++) {
  91. paths[i] = new Path(Text.readString(in));
  92. }
  93. arrLength = in.readInt();
  94. startoffset = new long[arrLength];
  95. for(int i=0; i<arrLength;i++) {
  96. startoffset[i] = in.readLong();
  97. }
  98. }
  99. public void write(DataOutput out) throws IOException {
  100. out.writeLong(totLength);
  101. out.writeInt(lengths.length);
  102. for(long length : lengths) {
  103. out.writeLong(length);
  104. }
  105. out.writeInt(paths.length);
  106. for(Path p : paths) {
  107. Text.writeString(out, p.toString());
  108. }
  109. out.writeInt(startoffset.length);
  110. for(long length : startoffset) {
  111. out.writeLong(length);
  112. }
  113. }
  114. @Override
  115. public String toString() {
  116. StringBuffer sb = new StringBuffer();
  117. for (int i = 0; i < paths.length; i++) {
  118. if (i == 0 ) {
  119. sb.append("Paths:");
  120. }
  121. sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
  122. "+" + lengths[i]);
  123. if (i < paths.length -1) {
  124. sb.append(",");
  125. }
  126. }
  127. if (locations != null) {
  128. String locs = "";
  129. StringBuffer locsb = new StringBuffer();
  130. for (int i = 0; i < locations.length; i++) {
  131. locsb.append(locations[i] + ":");
  132. }
  133. locs = locsb.toString();
  134. sb.append(" Locations:" + locs + "; ");
  135. }
  136. return sb.toString();
  137. }
  138. }

原文链接:http://www.cnblogs.com/bajiaotai/p/15708969.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号