HDFSjava API实验

package com.gw;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;

/**
 * @author wangweifeng
 * @dercription: 实现往HDFS中新建文件夹、删除文件夹、创建文件、删除文件 获取HDFS集群上所有节点名称信息
 *               、文件重命名、读取HDFS文件中的内容、 上传本地文件到HDFS中、下载HDFS中的文件本地文件系统、列出目录下所有文件
 *               (如果是目录则显示,层次显示目录内的文件)
 */

public class HdfsUtil {
    
    // 初始化配置参数
    static Configuration conf = new Configuration();
    static {
        String path = "/home/Hadoop/hadoop/etc/hadoop/";
        conf.addResource(new Path(path + "core-site.xml"));
        conf.addResource(new Path(path + "hdfs-site.xml"));
        conf.addResource(new Path(path + "mapred-site.xml"));
    }
    //获取FileSystem
    public static FileSystem getFs() throws IOException {
        FileSystem fs = FileSystem.get(conf);
        return fs;
    }
    
    

    // 实现往HDFS中新建文件夹
    public static void mkDir(String path) throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(path);
        boolean isok = fs.mkdirs(srcPath);
        if (isok) {
            System.out.println("create dir Ok!");
        } else {
            System.out.println("create dir failure!");
        }
        fs.close();
    }

    // 删除HDFS中的文件夹或是文件
    public static void delete(String path) throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(path);
        boolean isok = fs.delete(srcPath, true);
        if (isok) {
            System.out.println("delete ok!");
        } else {
            System.out.println("delete failure!");
        }
        fs.close();
    }

    // 在HDFS中创建文件并写入内容
    public static void createFile(String path, byte[] contents)
            throws IOException {
        FileSystem fs = getFs();
        Path dstPath = new Path(path); // 目标路径

        // 打开一个输出流
        FSDataOutputStream outputStream = fs.create(dstPath);
        outputStream.write(contents);
        outputStream.close();
        fs.close();
        System.out.println("file is created sucess!");

    }

    // 在HDFS中创建文件并写入内容
    public static void createFile(String path, String contents)
            throws IOException {
        createFile(path, contents.getBytes("UTF-8"));
    }

    // 对HDFS中的文件重命名
    public static void reNameFile(String oldName, String newName)
            throws IOException {
        FileSystem fs = getFs();
        Path oldNamePath = new Path(oldName);
        Path newNamePath = new Path(newName);
        boolean isok = fs.rename(oldNamePath, newNamePath);
        if (isok) {
            System.out.println("rename ok!");
        } else {
            System.out.println("rename failure");
        }
        fs.close();
    }

    // 读取HDFS中的文件内容并打印到标准输出
    public static void readFilePrint(String path) throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(path);
        // 打开一个输入流
        InputStream in = fs.open(srcPath);
        try {
            fs.open(srcPath);
            IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流
        } finally {
            IOUtils.closeStream(in);
        }
    }

    // 读取文件内容
    public static byte[] readFile(String path) throws IOException {
        FileSystem fs = getFs();
        if (isExist(path)) {
            Path srcPath = new Path(path);
            FSDataInputStream is = fs.open(srcPath);
            FileStatus stat = fs.getFileStatus(srcPath);
            byte[] buffer = new byte[(int) stat.getLen()];
            is.readFully(0, buffer);
            is.close();
            fs.close();
            return buffer;
        } else {
            throw new IOException("the file is not found .");
        }
    }

    // 上传
    public static void upLoadFromLoacl(String HDFS_Path, String Loacl_Path)
            throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(Loacl_Path);
        Path desPath = new Path(HDFS_Path);
        fs.copyFromLocalFile(srcPath, desPath);

        // 打印文件路径
        System.out.println("Upload to " + conf.get("fs.default.name"));
        System.out.println("------------list files------------" + "\n");

        FileStatus[] fileStatus = fs.listStatus(desPath);
        for (FileStatus status : fileStatus) {
            System.out.println(status.getPath());
        }
        fs.close();
    }

    // 下载
    public static void downLoadToLoacl(String HDFS_Path, String Loacl_Path)
            throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(HDFS_Path);
        Path desPath = new Path(Loacl_Path);
        fs.copyToLocalFile(srcPath, desPath);
        fs.close();
        System.out.println("download to sucess! ");
    }

    // 判断文件或文件夹是否存在,如果存在则返回true,否在返回false
    public static boolean isExist(String path) throws IOException {
        FileSystem fs = getFs();
        Path srcPath = new Path(path);
        boolean isExist = false;
        if (fs.isDirectory(srcPath)) {
            isExist = true;
        } else if (fs.isFile(srcPath)) {
            isExist = true;
        }
        return isExist;
    }

    // 获取集群中所有数据节点的状态
    public static void getDateNodeInfo() throws IOException {
        FileSystem fs = getFs();
        DistributedFileSystem hdfs = (DistributedFileSystem) fs;

        DatanodeInfo[] dataNodeInfo = hdfs.getDataNodeStats();
        for (int i = 0; i < dataNodeInfo.length; i++) {
            System.out.println("DataNode_" + i + "_Name:"
                    + dataNodeInfo[i].getHostName() + "DataNode_" + i + "Ip:"
                    + dataNodeInfo[i].getInfoAddr());
        }
    }

    public static void main(String[] args) throws IOException {
        System.out.println("........开始测试HDFS......");

        // 1、创建一个文件夹命名为HDTest2,并在该文件夹中创建一个文件夹test以便测试删除
        //mkDir("/HDTest2");
        //mkDir("/HDTest2/test");
        
        // 2、删除/HDTest2/test
        //delete("/HDTest2/test");

        // 3、在HDFS中创建一个文件,并写入"helloworld"
        //createFile("/HDTest2/helloworld.txt", "helloworld of HDFS!");

        // 4、将/HDTest2/helloworld.txt 命名为/HDTest2/helloworld.ini
        //reNameFile("/HDTest2/helloworld.txt", "/HDTest2/helloworld.ini");

        // 5、将文件内容打印出来
     //readFilePrint("/HDTest2/helloworld.ini");

        // 6、将文件内容读出来,并创建 /HDTest2/test.txt文件
        //createFile("/HDTest2/test.txt", readFile("/HDTest2/helloworld.ini"));

        // 7、将本机桌面my.cnf文件上传到/HDTest2
        //upLoadFromLoacl("/HDTest2/", "/home/Hadoop/Desktop/my.cnf");

        // 8、将/HDTest2/test.txt下载到桌面
        //downLoadToLoacl("/HDTest2/test.txt", "/home/Hadoop/Desktop");

        // 9、打印DataNode的信息
        //getDateNodeInfo();

        
    }
        
}

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。