经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
MapReduce之Job提交流程源码和切片源码分析
来源:cnblogs  作者:kocdaniel  时间:2019/9/30 8:51:24  对本文有异议

hadoop2.7.2 MapReduce Job提交源码及切片源码分析

  1. 首先从waitForCompletion函数进入
  1. boolean result = job.waitForCompletion(true);
  1. /**
  2. * Submit the job to the cluster and wait for it to finish.
  3. * @param verbose print the progress to the user
  4. * @return true if the job succeeded
  5. * @throws IOException thrown if the communication with the
  6. * <code>JobTracker</code> is lost
  7. */
  8. public boolean waitForCompletion(boolean verbose
  9. ) throws IOException, InterruptedException,
  10. ClassNotFoundException {
  11. // 首先判断state,当state为DEFINE时可以提交,进入 submit() 方法
  12. if (state == JobState.DEFINE) {
  13. submit();
  14. }
  15. if (verbose) {
  16. monitorAndPrintJob();
  17. } else {
  18. // get the completion poll interval from the client.
  19. int completionPollIntervalMillis =
  20. Job.getCompletionPollInterval(cluster.getConf());
  21. while (!isComplete()) {
  22. try {
  23. Thread.sleep(completionPollIntervalMillis);
  24. } catch (InterruptedException ie) {
  25. }
  26. }
  27. }
  28. return isSuccessful();
  29. }
  1. 进入submit()方法
  1. /**
  2. * Submit the job to the cluster and return immediately.
  3. * @throws IOException
  4. */
  5. public void submit()
  6. throws IOException, InterruptedException, ClassNotFoundException {
  7. // 确认JobState状态为可提交状态,否则不能提交
  8. ensureState(JobState.DEFINE);
  9. // 设置使用最新的API
  10. setUseNewAPI();
  11. // 进入connect()方法,MapReduce作业提交时连接集群是通过Job类的connect()方法实现的,
  12. // 它实际上是构造集群Cluster实例cluster
  13. connect();
  14. // connect()方法执行完之后,定义提交者submitter
  15. final JobSubmitter submitter =
  16. getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  17. status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
  18. public JobStatus run() throws IOException, InterruptedException,
  19. ClassNotFoundException {
  20. // 这里的核心方法是submitJobInternal(),顾名思义,提交job的内部方法,实现了提交job的所有业务逻辑
  21. // 进入submitJobInternal
  22. return submitter.submitJobInternal(Job.this, cluster);
  23. }
  24. });
  25. // 提交之后state状态改变
  26. state = JobState.RUNNING;
  27. LOG.info("The url to track the job: " + getTrackingURL());
  28. }
  1. 进入connect()方法
  • MapReduce作业提交时连接集群通过Job的Connect方法实现,它实际上是构造集群Cluster实例cluster
  • cluster是连接MapReduce集群的一种工具,提供了获取MapReduce集群信息的方法
  • 在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol的实例client,它由ClientProtocolProvider的静态create()方法构造
  • 在create内部,Hadoop2.x中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,Cluster实际上是由它们负责与集群进行通信的
  1. private synchronized void connect()
  2. throws IOException, InterruptedException, ClassNotFoundException {
  3. if (cluster == null) {// cluster提供了远程获取MapReduce的方法
  4. cluster =
  5. ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
  6. public Cluster run()
  7. throws IOException, InterruptedException,
  8. ClassNotFoundException {
  9. // 只需关注这个Cluster()构造器,构造集群cluster实例
  10. return new Cluster(getConfiguration());
  11. }
  12. });
  13. }
  14. }
  1. 进入Cluster()构造器
  1. // 首先调用一个参数的构造器,间接调用两个参数的构造器
  2. public Cluster(Configuration conf) throws IOException {
  3. this(null, conf);
  4. }
  5. public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
  6. throws IOException {
  7. this.conf = conf;
  8. this.ugi = UserGroupInformation.getCurrentUser();
  9. // 最重要的initialize方法
  10. initialize(jobTrackAddr, conf);
  11. }
  12. // cluster中要关注的两个成员变量是客户端通讯协议提供者ClientProtocolProvider和客户端通讯协议ClientProtocol实例client
  13. private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
  14. throws IOException {
  15. synchronized (frameworkLoader) {
  16. for (ClientProtocolProvider provider : frameworkLoader) {
  17. LOG.debug("Trying ClientProtocolProvider : "
  18. + provider.getClass().getName());
  19. ClientProtocol clientProtocol = null;
  20. try {
  21. // 如果配置文件没有配置YARN信息,则构建LocalRunner,MR任务本地运行
  22. // 如果配置文件有配置YARN信息,则构建YarnRunner,MR任务在YARN集群上运行
  23. if (jobTrackAddr == null) {
  24. // 客户端通讯协议client是调用ClientProtocolProvider的create()方法实现
  25. clientProtocol = provider.create(conf);
  26. } else {
  27. clientProtocol = provider.create(jobTrackAddr, conf);
  28. }
  29. if (clientProtocol != null) {
  30. clientProtocolProvider = provider;
  31. client = clientProtocol;
  32. LOG.debug("Picked " + provider.getClass().getName()
  33. + " as the ClientProtocolProvider");
  34. break;
  35. }
  36. else {
  37. LOG.debug("Cannot pick " + provider.getClass().getName()
  38. + " as the ClientProtocolProvider - returned null protocol");
  39. }
  40. }
  41. catch (Exception e) {
  42. LOG.info("Failed to use " + provider.getClass().getName()
  43. + " due to error: ", e);
  44. }
  45. }
  46. }
  47. if (null == clientProtocolProvider || null == client) {
  48. throw new IOException(
  49. "Cannot initialize Cluster. Please check your configuration for "
  50. + MRConfig.FRAMEWORK_NAME
  51. + " and the correspond server addresses.");
  52. }
  53. }
  1. 进入submitJobInternal(),job的内部提交方法,用于提交job到集群
  1. JobStatus submitJobInternal(Job job, Cluster cluster)
  2. throws ClassNotFoundException, InterruptedException, IOException {
  3. //validate the jobs output specs
  4. // 检查结果的输出路径是否已经存在,如果存在会报异常
  5. checkSpecs(job);
  6. // conf里边是集群的xml配置文件信息
  7. Configuration conf = job.getConfiguration();
  8. // 添加MR框架到分布式缓存中
  9. addMRFrameworkToDistributedCache(conf);
  10. // 获取提交执行时相关资源的临时存放路径
  11. // 参数未配置时默认是(工作空间根目录下的)/tmp/hadoop-yarn/staging/提交作业用户名/.staging
  12. Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  13. //configure the command line options correctly on the submitting dfs
  14. InetAddress ip = InetAddress.getLocalHost();
  15. if (ip != null) {//记录提交作业的主机IP、主机名,并且设置配置信息conf
  16. submitHostAddress = ip.getHostAddress();
  17. submitHostName = ip.getHostName();
  18. conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
  19. conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  20. }
  21. // 获取JobId
  22. JobID jobId = submitClient.getNewJobID();
  23. // 设置jobId
  24. job.setJobID(jobId);
  25. // 提交作业的路径Path(Path parent, String child),会将两个参数拼接为一个路径
  26. Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  27. // job的状态
  28. JobStatus status = null;
  29. try {
  30. conf.set(MRJobConfig.USER_NAME,
  31. UserGroupInformation.getCurrentUser().getShortUserName());
  32. conf.set("hadoop.http.filter.initializers",
  33. "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
  34. conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
  35. LOG.debug("Configuring job " + jobId + " with " + submitJobDir
  36. + " as the submit dir");
  37. // get delegation token for the dir
  38. TokenCache.obtainTokensForNamenodes(job.getCredentials(),
  39. new Path[] { submitJobDir }, conf);
  40. populateTokenCache(conf, job.getCredentials());
  41. // generate a secret to authenticate shuffle transfers
  42. if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
  43. KeyGenerator keyGen;
  44. try {
  45. keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
  46. keyGen.init(SHUFFLE_KEY_LENGTH);
  47. } catch (NoSuchAlgorithmException e) {
  48. throw new IOException("Error generating shuffle secret key", e);
  49. }
  50. SecretKey shuffleKey = keyGen.generateKey();
  51. TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
  52. job.getCredentials());
  53. }
  54. if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
  55. conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
  56. LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
  57. "data spill is enabled");
  58. }
  59. // 拷贝jar包到集群
  60. // 此方法中调用如下方法:rUploader.uploadFiles(job, jobSubmitDir);
  61. // uploadFiles方法将jar包拷贝到集群
  62. copyAndConfigureFiles(job, submitJobDir);
  63. Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
  64. // Create the splits for the job
  65. LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
  66. // 计算切片,生成切片规划文件
  67. int maps = writeSplits(job, submitJobDir);
  68. conf.setInt(MRJobConfig.NUM_MAPS, maps);
  69. LOG.info("number of splits:" + maps);
  70. // write "queue admins of the queue to which job is being submitted"
  71. // to job file.
  72. String queue = conf.get(MRJobConfig.QUEUE_NAME,
  73. JobConf.DEFAULT_QUEUE_NAME);
  74. AccessControlList acl = submitClient.getQueueAdmins(queue);
  75. conf.set(toFullPropertyName(queue,
  76. QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
  77. // removing jobtoken referrals before copying the jobconf to HDFS
  78. // as the tasks don't need this setting, actually they may break
  79. // because of it if present as the referral will point to a
  80. // different job.
  81. TokenCache.cleanUpTokenReferral(conf);
  82. if (conf.getBoolean(
  83. MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
  84. MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
  85. // Add HDFS tracking ids
  86. ArrayList<String> trackingIds = new ArrayList<String>();
  87. for (Token<? extends TokenIdentifier> t :
  88. job.getCredentials().getAllTokens()) {
  89. trackingIds.add(t.decodeIdentifier().getTrackingId());
  90. }
  91. conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
  92. trackingIds.toArray(new String[trackingIds.size()]));
  93. }
  94. // Set reservation info if it exists
  95. ReservationId reservationId = job.getReservationId();
  96. if (reservationId != null) {
  97. conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
  98. }
  99. // Write job file to submit dir
  100. writeConf(conf, submitJobFile);
  101. //
  102. // Now, actually submit the job (using the submit name)
  103. // 开始正式提交job
  104. printTokens(jobId, job.getCredentials());
  105. status = submitClient.submitJob(
  106. jobId, submitJobDir.toString(), job.getCredentials());
  107. if (status != null) {
  108. return status;
  109. } else {
  110. throw new IOException("Could not launch job");
  111. }
  112. } finally {
  113. if (status == null) {
  114. LOG.info("Cleaning up the staging area " + submitJobDir);
  115. if (jtFs != null && submitJobDir != null)
  116. jtFs.delete(submitJobDir, true);
  117. }
  118. }
  119. }
  1. 进入writeSplits(job, submitJobDir),计算切片,生成切片规划文件
  • 内部会调用writeNewSplits(job, jobSubmitDir)方法
  • writeNewSplits(job, jobSubmitDir)内部定义了一个InputFormat类型的实例input
  • InputFormat主要作用
    • 验证job的输入规范
    • 对输入的文件进行切分,形成多个InputSplit(切片)文件,每一个InputSplit对应着一个map任务(MapTask)
    • 将切片后的数据按照规则形成key,value键值对RecordReader
  • input调用getSplits()方法:List<InputSplit> splits = input.getSplits(job);
  1. 进入FileInputFormat类下的getSplits(job)方法
  1. /**
  2. * Generate the list of files and make them into FileSplits.
  3. * @param job the job context
  4. * @throws IOException
  5. */
  6. public List<InputSplit> getSplits(JobContext job) throws IOException {
  7. StopWatch sw = new StopWatch().start();
  8. // getFormatMinSplitSize()返回值固定为1,getMinSplitSize(job)返回job大小
  9. long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  10. // getMaxSplitSize(job)返回Lang类型的最大值
  11. long maxSize = getMaxSplitSize(job);
  12. // generate splits 生成切片
  13. List<InputSplit> splits = new ArrayList<InputSplit>();
  14. List<FileStatus> files = listStatus(job);
  15. // 遍历job下的所有文件
  16. for (FileStatus file: files) {
  17. // 获取文件路径
  18. Path path = file.getPath();
  19. // 获取文件大小
  20. long length = file.getLen();
  21. if (length != 0) {
  22. BlockLocation[] blkLocations;
  23. if (file instanceof LocatedFileStatus) {
  24. blkLocations = ((LocatedFileStatus) file).getBlockLocations();
  25. } else {
  26. FileSystem fs = path.getFileSystem(job.getConfiguration());
  27. blkLocations = fs.getFileBlockLocations(file, 0, length);
  28. }
  29. // 判断是否可分割
  30. if (isSplitable(job, path)) {
  31. // 获取块大小
  32. // 本地环境块大小默认为32MB,YARN环境在hadoop2.x新版本为128MB,旧版本为64MB
  33. long blockSize = file.getBlockSize();
  34. // 计算切片的逻辑大小,默认等于块大小
  35. // 返回值为:return Math.max(minSize, Math.min(maxSize, blockSize));
  36. // 其中minSize=1, maxSize=Long类型最大值, blockSize为切片大小
  37. long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  38. long bytesRemaining = length;
  39. // 每次切片时就要判断切片剩下的部分是否大于切片大小的SPLIT_SLOP(默认为1.1)倍,
  40. // 否则就不再切分,划为一块
  41. while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  42. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  43. splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  44. blkLocations[blkIndex].getHosts(),
  45. blkLocations[blkIndex].getCachedHosts()));
  46. bytesRemaining -= splitSize;
  47. }
  48. if (bytesRemaining != 0) {
  49. int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  50. splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  51. blkLocations[blkIndex].getHosts(),
  52. blkLocations[blkIndex].getCachedHosts()));
  53. }
  54. } else { // not splitable
  55. splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
  56. blkLocations[0].getCachedHosts()));
  57. }
  58. } else {
  59. //Create empty hosts array for zero length files
  60. splits.add(makeSplit(path, 0, length, new String[0]));
  61. }
  62. }
  63. // Save the number of input files for metrics/loadgen
  64. job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  65. sw.stop();
  66. if (LOG.isDebugEnabled()) {
  67. LOG.debug("Total # of splits generated by getSplits: " + splits.size()
  68. + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
  69. }
  70. return splits;
  71. }

原文链接:http://www.cnblogs.com/kocdaniel/p/11609404.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号