HDFS编程应用实践



1.HDFS相关命令操作

1.1 查看命令(前提hadoop已启动)

./bin/hdfs dfs -ls a

1.2 创建用户目录

  • hadoop为用户登录名,/user/hadoop 目录就是hadoop用户对应的用户目录
./bin/hdfs dfs –mkdir –p /user/hadoop     

// 查看 hadoop用户对应的用户目录
./bin/hdfs dfs –ls .        

// 也可以用下面命令查看 hadoop用户对应的用户目录,
./bin/hdfs dfs –ls /user/hadoop

1.3 创建与删除目录

./bin/hdfs dfs –mkdir a

./bin/hdfs dfs –rm –r a

1.4 文件上传

./bin/hdfs dfs -put /home/hadoop/file1.txt a
./bin/hdfs dfs -put /home/hadoop/file2.txt a


./bin/hdfs dfs -ls a
  1. 位于虚拟机中的/home/hadoop/a 目录,下有多个txt后缀文件

  2. ./bin/hdfs dfs -put /home/hadoop/a/file1.txt a ,上传到HDFS a目录下(已上传过,才会报当前文件已存在)

  1. ./bin/hdfs dfs -ls a 查看hadoop用户所在的a目录的资源,如图

1.5 查看文件

  1. ./bin/hdfs dfs –cat input/file1.txt 查看

2.编程实践

2.1 需求

  • HDFS中,/user/hadoop/a 带有 txt后缀与abc后缀的文件,需将txt后缀文件里面的内容合并到HDFS/user/hadoop/a 目录 merge.txt 文件中

2.2 springBoot集成Hadoop

  • JDK17,springBoot 版本3.2.0(必须3.0以上)
<dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
     </dependency>

     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-actuator</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-aop</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-test</artifactId>
         <scope>test</scope>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-hdfs</artifactId>
         <version>3.4.1</version>
         <scope>test</scope>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>3.4.1</version>
     </dependency>
     <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-client</artifactId>
         <version>3.4.1</version>
     </dependency>

 </dependencies>

2.3 编码实现部分

  1. **编码说明**,conf.set(“fs.defaultFS”, “hdfs://192.168.31.101:9000”); 在hadoop里面配置HDFS时,需写成对应的ip,否则会被 拒接链接
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.31.101:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  1. 具体配置如下(虚拟机中装的单机版):

  2. 完整的业务部分代码

package com.coderpwh.service.impl;

import com.coderpwh.service.FileService;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.springframework.stereotype.Service;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;

import org.apache.hadoop.fs.*;

/**
 * @author coderpwh
 */
@Service
public class FileServiceImpl implements FileService, PathFilter {


    private String reg=".*\\\\.abc";

    //待合并的文件所在的目录的路径
    Path inputPath =  new Path("hdfs://192.168.31.101:9000/user/hadoop/");

    //输出文件的路径
    Path outputPath =  new Path("hdfs://192.168.31.101:9000/user/hadoop/merge.txt");


    /***
     * 合并文件
     * @return
     */
    @Override
    public String mergeFile() {

        reg = ".*\\\\.abc";
        inputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/a/");
        outputPath = new Path("hdfs://192.168.31.101:9000/user/hadoop/a/merge.txt");
        try {
            doMerge(reg);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "success";
    }


    @Override
    public boolean accept(Path path) {
        if (!(path.toString().matches(reg))) {
            return true;
        }
        return false;
    }

    public void doMerge(String regText) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.31.101:9000");

        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        System.out.println("url:"+inputPath.toString());
        System.out.println(URI.create(inputPath.toString()));
        FileSystem fsSource =
                FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
//        FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(".*\\.abc"));
        FileStatus[] sourceStatus = fsSource.listStatus(inputPath);
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        PrintStream ps = new PrintStream(System.out);
        for (FileStatus sta : sourceStatus) {
            System.out.print("路径:" + sta.getPath() + " 文件大小:" + sta.getLen() + " 权限:" + sta.getPermission() + " 内容");
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            byte[] data = new byte[1024];
            int read = -1;
            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
            fsdis.close();
        }
        ps.close();
        fsdos.close();
    }


}
  1. 业务层接口
package com.coderpwh.service;

/**
 * @author coderpwh
 */
public interface FileService {

    /***
     * 合并文件
     * @return
     */
    String mergeFile();


}
  1. 控制层
package com.coderpwh.controller;

import com.coderpwh.service.FileService;
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author coderpwh
 */
@RequestMapping("/hdfs")
@RestController
public class HdfsController {


    @Resource
    private FileService fileService;


    @RequestMapping(value = "/mergeFile", method = RequestMethod.GET)
    public String mergeFile() {
        return fileService.mergeFile();
    }

}
  1. application.yml 配置
server:
  port: 9001

2.4 运行日志打印结果图如下

3.查看实际效果

1.查看命令 ./bin/hdfs dfs -cat a/merge.txt 命令查看 merge.txt 结果


文章作者: coderpwh
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 coderpwh !
  目录