前置操作
创建maven工程,修改pom.xml文件:
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.mcq</groupId>
- <artifactId>HDFS-001</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.2</version>
- </dependency>
- <dependency>
- <groupId>jdk.tools</groupId>
- <artifactId>jdk.tools</artifactId>
- <version>1.8</version>
- <scope>system</scope>
- <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
- </dependency>
- </dependencies>
-
- </project>
在resources添加一个file:log4j.properties:
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
API操作
HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:
- package com.mcq;
-
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.BlockLocation;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.LocatedFileStatus;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.RemoteIterator;
- import org.junit.Test;
-
- public class HDFSClient {
- public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
- Configuration conf = new Configuration();
- // c.set("fs.defaultFS", "hdfs://hadoop103:9000");
- // FileSystem fs = FileSystem.get(c);
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
- fs.mkdirs(new Path("/ppqq"));
- fs.close();
- System.out.println("over");
- }
-
- @Test // 文件上传
- public void testCopyFromLocalFile()
- throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
- fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
- fs.close();
- System.out.println("over");
- }
-
- @Test // 文件下载
- public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
- fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
- // 第一个false表示不剪切,最后一个true表示本地,不产生crc文件
-
- fs.close();
- System.out.println("over");
- }
-
- @Test // 文件删除
- public void testDelete() throws IOException, InterruptedException, URISyntaxException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
- fs.delete(new Path("/0811"), true); // 是否递归删除
- fs.close();
- System.out.println("over");
- }
-
- @Test // 文件更名
- public void testRename() throws IOException, InterruptedException, URISyntaxException {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
- fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
- fs.close();
- System.out.println("over");
- }
-
- @Test
- public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
-
- // 1获取文件系统
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 获取文件详情
- RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
-
- while (listFiles.hasNext()) {
- LocatedFileStatus status = listFiles.next();
-
- // 输出详情
- // 文件名称
- System.out.println(status.getPath().getName());
- // 长度
- System.out.println(status.getLen());
- // 权限
- System.out.println(status.getPermission());
- // 分组
- System.out.println(status.getGroup());
-
- // 获取存储的块信息
- BlockLocation[] blockLocations = status.getBlockLocations();
-
- for (BlockLocation blockLocation : blockLocations) {
-
- // 获取块存储的主机节点
- String[] hosts = blockLocation.getHosts();
-
- for (String host : hosts) {
- System.out.println(host);
- }
- }
-
- System.out.println("-----------分割线----------");
- }
-
- // 3 关闭资源
- fs.close();
- }
-
- @Test
- public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
-
- // 1 获取文件配置信息
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 判断是文件还是文件夹
- FileStatus[] listStatus = fs.listStatus(new Path("/"));
-
- for (FileStatus fileStatus : listStatus) {
-
- // 如果是文件
- if (fileStatus.isFile()) {
- System.out.println("f:"+fileStatus.getPath().getName());
- }else {
- System.out.println("d:"+fileStatus.getPath().getName());
- }
- }
-
- // 3 关闭资源
- fs.close();
- }
- }
I/O流操作
上面的API操作 HDFS系统都是框架封装好的,如果我们想自己实现上述API操作可以采用IO流的方式实现数据的上传和下载。
- package com.mcq;
-
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
-
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.yarn.api.records.URL;
- import org.junit.Test;
-
- public class HDFSIO {
- //文件上传
- @Test
- public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
-
- // 1 获取文件系统
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 创建输入流
- FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));
-
- // 3 获取输出流
- FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));
-
- // 4 流对拷
- IOUtils.copyBytes(fis, fos, configuration);
-
- // 5 关闭资源
- IOUtils.closeStream(fos);
- IOUtils.closeStream(fis);
- fs.close();
- }
- // 文件下载
- @Test
- public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
-
- // 1 获取文件系统
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 获取输入流
- FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
-
- // 3 获取输出流
- FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
-
- // 4 流的对拷
- IOUtils.copyBytes(fis, fos, configuration);
-
- // 5 关闭资源
- IOUtils.closeStream(fos);
- IOUtils.closeStream(fis);
- fs.close();
- }
- //定位文件读取
- //(1)下载第一块
- @Test
- public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{
-
- // 1 获取文件系统
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 获取输入流
- FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
-
- // 3 创建输出流
- FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
-
- // 4 流的拷贝
- byte[] buf = new byte[1024];
-
- for(int i =0 ; i < 1024 * 128; i++){
- fis.read(buf);
- fos.write(buf);
- }
-
- // 5关闭资源
- IOUtils.closeStream(fis);
- IOUtils.closeStream(fos);
- fs.close();
- }
- //(2)下载第二块
- @Test
- public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
-
- // 1 获取文件系统
- Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
-
- // 2 打开输入流
- FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
-
- // 3 定位输入数据位置
- fis.seek(1024*1024*128);
-
- // 4 创建输出流
- FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
-
- // 5 流的对拷
- IOUtils.copyBytes(fis, fos, configuration);
-
- // 6 关闭资源
- IOUtils.closeStream(fis);
- IOUtils.closeStream(fos);
- }
- }