1.HDFS相关命令操作
1.1 查看命令(前提hadoop已启动)
./bin/hdfs dfs -ls a
1.2 创建用户目录
- hadoop为用户登录名,/user/hadoop 目录就是hadoop用户对应的用户目录
./bin/hdfs dfs –mkdir –p /user/hadoop
// 查看 hadoop用户对应的用户目录
./bin/hdfs dfs –ls .
// 也可以用下面命令查看 hadoop用户对应的用户目录,
./bin/hdfs dfs –ls /user/hadoop
1.3 创建与删除目录
./bin/hdfs dfs –mkdir a
./bin/hdfs dfs –rm –r a
1.4 文件上传
./bin/hdfs dfs -put /home/hadoop/file1.txt a
./bin/hdfs dfs -put /home/hadoop/file2.txt a
./bin/hdfs dfs -ls a
位于虚拟机中的/home/hadoop/a 目录,下有多个txt后缀文件
./bin/hdfs dfs -put /home/hadoop/a/file1.txt a ,上传到HDFS a目录下(已上传过,才会报当前文件已存在)
- ./bin/hdfs dfs -ls a 查看hadoop用户所在的a目录的资源,如图
1.5 查看文件
- ./bin/hdfs dfs –cat input/file1.txt 查看
2.编程实践
2.1 需求
- 在HDFS中,/user/hadoop/a 带有 txt后缀与abc后缀的文件,需将txt后缀文件里面的内容合并到HDFS的 /user/hadoop/a 目录 merge.txt 文件中
2.2 springBoot集成Hadoop
- JDK17,springBoot 版本3.2.0(必须3.0以上)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
2.3 编码实现部分
- **编码说明**,conf.set(“fs.defaultFS”, “hdfs://192.168.31.101:9000”); 在hadoop里面配置HDFS时,需写成对应的ip,否则会被 拒接链接
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.31.101:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
具体配置如下(虚拟机中装的单机版):
完整的业务部分代码
package com.coderpwh.service.impl;
import com.coderpwh.service.FileService;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.springframework.stereotype.Service;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import org.apache.hadoop.fs.*;
/**
* @author coderpwh
*/
@Service
public class FileServiceImpl implements FileService, PathFilter {
private String reg=".*\\\\.abc";
//待合并的文件所在的目录的路径
Path inputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/");
//输出文件的路径
Path outputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/merge.txt");
/***
* 合并文件
* @return
*/
@Override
public String mergeFile() {
reg = ".*\\\\.abc";
inputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/a/");
outputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/a/merge.txt");
try {
doMerge(reg);
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}
@Override
public boolean accept(Path path) {
if (!(path.toString().matches(reg))) {
return true;
}
return false;
}
public void doMerge(String regText) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.31.101:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
System.out.println("url:"+inputPath.toString());
System.out.println(URI.create(inputPath.toString()));
FileSystem fsSource =
FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
// FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(".*\\.abc"));
FileStatus[] sourceStatus = fsSource.listStatus(inputPath);
FSDataOutputStream fsdos = fsDst.create(outputPath);
PrintStream ps = new PrintStream(System.out);
for (FileStatus sta : sourceStatus) {
System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read = -1;
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0, read);
}
fsdis.close();
}
ps.close();
fsdos.close();
}
}
- 业务层接口
package com.coderpwh.service;
/**
* @author coderpwh
*/
public interface FileService {
/***
* 合并文件
* @return
*/
String mergeFile();
}
- 控制层
package com.coderpwh.controller;
import com.coderpwh.service.FileService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @author coderpwh
*/
@RequestMapping("/hdfs")
@RestController
public class HdfsController {
@Resource
private FileService fileService;
@RequestMapping(value = "/mergeFile", method = RequestMethod.GET)
public String mergeFile() {
return fileService.mergeFile();
}
}
- application.yml 配置
server:
port: 9001
2.4 运行日志打印结果图如下
3.查看实际效果
1.查看命令 ./bin/hdfs dfs -cat a/merge.txt 命令查看 merge.txt 结果