经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 大数据/云/AI » Hadoop » 查看文章
HDFS常用API操作 和 HDFS的I/O流操作
来源:cnblogs  作者:swineherd_MCQ  时间:2019/10/31 12:51:55  对本文有异议

前置操作

创建maven工程,修改pom.xml文件:

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  2. <modelVersion>4.0.0</modelVersion>
  3. <groupId>com.mcq</groupId>
  4. <artifactId>HDFS-001</artifactId>
  5. <version>0.0.1-SNAPSHOT</version>
  6. <dependencies>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>RELEASE</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.logging.log4j</groupId>
  14. <artifactId>log4j-core</artifactId>
  15. <version>2.8.2</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.hadoop</groupId>
  19. <artifactId>hadoop-common</artifactId>
  20. <version>2.7.2</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.hadoop</groupId>
  24. <artifactId>hadoop-client</artifactId>
  25. <version>2.7.2</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.hadoop</groupId>
  29. <artifactId>hadoop-hdfs</artifactId>
  30. <version>2.7.2</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>jdk.tools</groupId>
  34. <artifactId>jdk.tools</artifactId>
  35. <version>1.8</version>
  36. <scope>system</scope>
  37. <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
  38. </dependency>
  39. </dependencies>
  40.  
  41. </project>

在resources添加一个file:log4j.properties:

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

 

API操作

HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:

  1. package com.mcq;
  2.  
  3. import java.io.IOException;
  4. import java.net.URI;
  5. import java.net.URISyntaxException;
  6.  
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.BlockLocation;
  9. import org.apache.hadoop.fs.FileStatus;
  10. import org.apache.hadoop.fs.FileSystem;
  11. import org.apache.hadoop.fs.LocatedFileStatus;
  12. import org.apache.hadoop.fs.Path;
  13. import org.apache.hadoop.fs.RemoteIterator;
  14. import org.junit.Test;
  15.  
  16. public class HDFSClient {
  17. public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
  18. Configuration conf = new Configuration();
  19. // c.set("fs.defaultFS", "hdfs://hadoop103:9000");
  20. // FileSystem fs = FileSystem.get(c);
  21. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
  22. fs.mkdirs(new Path("/ppqq"));
  23. fs.close();
  24. System.out.println("over");
  25. }
  26.  
  27. @Test // 文件上传
  28. public void testCopyFromLocalFile()
  29. throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
  30. Configuration conf = new Configuration();
  31. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
  32. fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
  33. fs.close();
  34. System.out.println("over");
  35. }
  36.  
  37. @Test // 文件下载
  38. public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
  39. Configuration conf = new Configuration();
  40. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
  41. fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
  42. // 第一个false表示不剪切,最后一个true表示本地,不产生crc文件
  43.  
  44. fs.close();
  45. System.out.println("over");
  46. }
  47.  
  48. @Test // 文件删除
  49. public void testDelete() throws IOException, InterruptedException, URISyntaxException {
  50. Configuration conf = new Configuration();
  51. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
  52. fs.delete(new Path("/0811"), true); // 是否递归删除
  53. fs.close();
  54. System.out.println("over");
  55. }
  56.  
  57. @Test // 文件更名
  58. public void testRename() throws IOException, InterruptedException, URISyntaxException {
  59. Configuration conf = new Configuration();
  60. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
  61. fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
  62. fs.close();
  63. System.out.println("over");
  64. }
  65.  
  66. @Test
  67. public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
  68.  
  69. // 1获取文件系统
  70. Configuration configuration = new Configuration();
  71. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  72.  
  73. // 2 获取文件详情
  74. RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
  75.  
  76. while (listFiles.hasNext()) {
  77. LocatedFileStatus status = listFiles.next();
  78.  
  79. // 输出详情
  80. // 文件名称
  81. System.out.println(status.getPath().getName());
  82. // 长度
  83. System.out.println(status.getLen());
  84. // 权限
  85. System.out.println(status.getPermission());
  86. // 分组
  87. System.out.println(status.getGroup());
  88.  
  89. // 获取存储的块信息
  90. BlockLocation[] blockLocations = status.getBlockLocations();
  91.  
  92. for (BlockLocation blockLocation : blockLocations) {
  93.  
  94. // 获取块存储的主机节点
  95. String[] hosts = blockLocation.getHosts();
  96.  
  97. for (String host : hosts) {
  98. System.out.println(host);
  99. }
  100. }
  101.  
  102. System.out.println("-----------分割线----------");
  103. }
  104.  
  105. // 3 关闭资源
  106. fs.close();
  107. }
  108. @Test
  109. public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
  110. // 1 获取文件配置信息
  111. Configuration configuration = new Configuration();
  112. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  113. // 2 判断是文件还是文件夹
  114. FileStatus[] listStatus = fs.listStatus(new Path("/"));
  115. for (FileStatus fileStatus : listStatus) {
  116. // 如果是文件
  117. if (fileStatus.isFile()) {
  118. System.out.println("f:"+fileStatus.getPath().getName());
  119. }else {
  120. System.out.println("d:"+fileStatus.getPath().getName());
  121. }
  122. }
  123. // 3 关闭资源
  124. fs.close();
  125. }
  126. }

 I/O流操作

上面的API操作 HDFS系统都是框架封装好的,如果我们想自己实现上述API操作可以采用IO流的方式实现数据的上传和下载。

 

  1. package com.mcq;
  2.  
  3. import java.io.File;
  4. import java.io.FileInputStream;
  5. import java.io.FileOutputStream;
  6. import java.io.IOException;
  7. import java.net.URI;
  8. import java.net.URISyntaxException;
  9.  
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.fs.FSDataInputStream;
  12. import org.apache.hadoop.fs.FSDataOutputStream;
  13. import org.apache.hadoop.fs.FileSystem;
  14. import org.apache.hadoop.fs.Path;
  15. import org.apache.hadoop.io.IOUtils;
  16. import org.apache.hadoop.yarn.api.records.URL;
  17. import org.junit.Test;
  18.  
  19. public class HDFSIO {
  20. //文件上传
  21. @Test
  22. public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
  23.  
  24. // 1 获取文件系统
  25. Configuration configuration = new Configuration();
  26. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  27.  
  28. // 2 创建输入流
  29. FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));
  30.  
  31. // 3 获取输出流
  32. FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));
  33.  
  34. // 4 流对拷
  35. IOUtils.copyBytes(fis, fos, configuration);
  36.  
  37. // 5 关闭资源
  38. IOUtils.closeStream(fos);
  39. IOUtils.closeStream(fis);
  40. fs.close();
  41. }
  42. // 文件下载
  43. @Test
  44. public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
  45.  
  46. // 1 获取文件系统
  47. Configuration configuration = new Configuration();
  48. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  49. // 2 获取输入流
  50. FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
  51. // 3 获取输出流
  52. FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
  53. // 4 流的对拷
  54. IOUtils.copyBytes(fis, fos, configuration);
  55. // 5 关闭资源
  56. IOUtils.closeStream(fos);
  57. IOUtils.closeStream(fis);
  58. fs.close();
  59. }
  60. //定位文件读取
  61. //(1)下载第一块
  62. @Test
  63. public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{
  64.  
  65. // 1 获取文件系统
  66. Configuration configuration = new Configuration();
  67. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  68. // 2 获取输入流
  69. FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
  70. // 3 创建输出流
  71. FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
  72. // 4 流的拷贝
  73. byte[] buf = new byte[1024];
  74. for(int i =0 ; i < 1024 * 128; i++){
  75. fis.read(buf);
  76. fos.write(buf);
  77. }
  78. // 5关闭资源
  79. IOUtils.closeStream(fis);
  80. IOUtils.closeStream(fos);
  81. fs.close();
  82. }
  83. //(2)下载第二块
  84. @Test
  85. public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
  86.  
  87. // 1 获取文件系统
  88. Configuration configuration = new Configuration();
  89. FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
  90. // 2 打开输入流
  91. FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
  92. // 3 定位输入数据位置
  93. fis.seek(1024*1024*128);
  94. // 4 创建输出流
  95. FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
  96. // 5 流的对拷
  97. IOUtils.copyBytes(fis, fos, configuration);
  98. // 6 关闭资源
  99. IOUtils.closeStream(fis);
  100. IOUtils.closeStream(fos);
  101. }
  102. }

 

原文链接:http://www.cnblogs.com/mcq1999/p/11769328.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号