一、HadoopHA的搭建:https://www.cnblogs.com/null-/p/10000309.html
二、pom文件依赖:
<dependencies> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-commonartifactId> <version>2.7.4version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-hdfsartifactId> <version>2.7.4version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-clientartifactId> <version>2.7.4version> dependency> <dependency> <groupId>org.apache.hadoopgroupId> <artifactId>hadoop-mapreduce-client-coreartifactId> <version>2.7.2version> dependency> <dependency> <groupId>org.slf4jgroupId> <artifactId>slf4j-apiartifactId> <version>1.7.25version> dependency> <dependency> <groupId>junitgroupId> <artifactId>junitartifactId> <version>4.12version> <scope>testscope> dependency> dependencies>
三、控制代码:
package com.hdfs.demo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Iterator; import java.util.Map; import java.util.Set; /** * @author 王传礼 */ public class HdfsDemo { /** * 根据配置获取HDFS文件操作系统 * * @return FileSystem */ public static FileSystem getHadoopFileSystem() { FileSystem fs = null; Configuration conf = null; //方法:本地没有hadoop系统,但可以远程访问。根据给定的URI和用户名,访问hdfs的配置参数 cOnf= new Configuration(); //Hadoop的用户名 String hdfsUserNmae = "root"; URI hdfsUri = null; try { hdfsUri = new URI("hdfs://192.168.182.135:8020"); // HDFS的访问路径 } catch (URISyntaxException e) { e.printStackTrace(); } try { //根据远程的NN节点,获取配置信息,创建HDFS对象 fs = FileSystem.get(hdfsUri, conf, hdfsUserNmae); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return fs; } /** * 这里的创建文件夹同shell中的mkdir -p 语序前面的文件夹不存在 * 跟java中的IO操作一样,也只能对path对象做操作;但是这里的Path对象是hdfs中的 * * @param fs,filepath * @return */ public static boolean myCreatePath(FileSystem fs,String filepath) { boolean b = false; Path path = new Path(filepath); try { // even the path exist,it can also create the path. b = fs.mkdirs(path); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 删除文件,实际上删除的是给定path路径的最后一个 * 跟java中一样,也需要path对象,不过是hadoop.fs包中的。 * 实际上delete(Path p)已经过时了,更多使用delete(Path p,boolean recursive) * 后面的布尔值实际上是对文件的删除,相当于rm -r * * @param fs * @return */ public static boolean myDropHdfsPath(FileSystem fs, String filepath) { boolean b = false; // drop the last path Path path = new Path(filepath); try { b = fs.delete(path, true); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 重命名文件夹 * * @param hdfs * @return */ public static boolean myRename(FileSystem hdfs, String oldname, String newname) { boolean b = false; Path oldPath = new Path(oldname); Path newPath = new Path(newname); try { b = hdfs.rename(oldPath, newPath); } catch (IOException e) { e.printStackTrace(); } finally { try { hdfs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 遍历文件夹 * public FileStatus[] listStatus(Path p) * 通常使用HDFS文件系统的listStatus(path)来获取改定路径的子路径。然后逐个判断 * 值得注意的是: * 1.并不是总有文件夹中有文件,有些文件夹是空的,如果仅仅做是否为文件的判断会有问题,必须加文件的长度是否为0的判断 * 2.使用getPath()方法获取的是FileStatus对象是带URL路径的。使用FileStatus.getPath().toUri().getPath()获取的路径才是不带url的路径 * * @param hdfs * @param listPath 传入的HDFS开始遍历的路径 * @return */ public static SetrecursiveHdfsPath(FileSystem hdfs, Path listPath) { /*FileStatus[] files = null; try { files = hdfs.listStatus(listPath); Path[] paths = FileUtil.stat2Paths(files); for(int i=0;i */ FileStatus[] files = null; Set set = null; try { files = hdfs.listStatus(listPath); // 实际上并不是每个文件夹都会有文件的。 if (files.length == 0) { // 如果不使用toUri(),获取的路径带URL。 set.add(listPath.toUri().getPath()); } else { // 判断是否为文件 for (FileStatus f : files) { if (files.length == 0 || f.isFile()) { set.add(f.getPath().toUri().getPath()); } else { // 是文件夹,且非空,就继续遍历 recursiveHdfsPath(hdfs, f.getPath()); } } } } catch (IOException e) { e.printStackTrace(); } return set; } /** * 文件简单的判断 * 是否存在 * 是否是文件夹 * 是否是文件 * * @param fs */ public static void myCheck(FileSystem fs, String filepath) { boolean isExists = false; boolean isDirectorys = false; boolean isFiles = false; Path path = new Path(filepath); try { isExists = fs.exists(path); isDirectorys = fs.isDirectory(path); isFiles = fs.isFile(path); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } if (!isExists) { System.out.println("The path is not exist."); } else { System.out.println("The path is exist."); if (isDirectorys) { System.out.println("This is a Directory"); } else if (isFiles) { System.out.println("This is Files"); } } } /** * 获取配置的所有信息 * 首先,我们要知道配置文件是哪一个 * 然后我们将获取的配置文件用迭代器接收 * 实际上配置中是KV对,我们可以通过java中的Entry来接收 */ public static void showAllConf() { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://node1:8020"); Iterator > it = conf.iterator(); while (it.hasNext()) { Map.Entry entry = it.next(); System.out.println(entry.getKey() + "=" + entry.getValue()); } } /** * 文件下载 * 注意下载的路径的最后一个地址是下载的文件名 * copyToLocalFile(Path local,Path hdfs) * 下载命令中的参数是没有任何布尔值的,如果添加了布尔是,意味着这是moveToLocalFile() *文件下载有权限要求 要有写的权限 * @param fs */ public static void getFileFromHDFS(FileSystem fs, String dfsFile, String locPath) { Path HDFSPath = new Path(dfsFile); Path localPath = new Path(locPath); try { fs.copyToLocalFile(HDFSPath, localPath); System.out.println("File download."); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 文件的上传 * 注意事项同文件的上传 * 注意如果上传的路径不存在会自动创建 * 如果存在同名的文件,会覆盖 * * @param fs */ public static void myPutFile2HDFS(FileSystem fs, String localFile, String dfsPath) { boolean pathExists = false; // 如果上传的路径不存在会创建 // 如果该路径文件已存在,就会覆盖 Path localPath = new Path(localFile); Path hdfsPath = new Path(dfsPath); try { fs.copyFromLocalFile(localPath, hdfsPath); System.out.println("File upload."); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * hdfs之间文件的复制 * 使用FSDataInputStream来打开文件open(Path p) * 使用FSDataOutputStream开创建写到的路径create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)来进行具体的读写 * 说明: * 1.java中使用缓冲区来加速读取文件,这里也使用了缓冲区,但是只要指定缓冲区大小即可,不必单独设置一个新的数组来接受 * 2.最后一个布尔值表示是否使用完后关闭读写流。通常是false,如果不手动关会报错的 * * @param hdfs */ public static void copyFileBetweenHDFS(FileSystem hdfs, String in, String out) { Path inPath = new Path(in); Path outPath = new Path(out); // byte[] ioBuffer = new byte[1024*1024*64]; // int len = 0; FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn, hdfsOut, 1024 * 1024 * 64, false); } catch (IOException e) { e.printStackTrace(); } finally { try { hdfsOut.close(); hdfsIn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
四、测试代码
package com.hdfs.demo; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; import java.util.Scanner; import java.util.Set; import static org.junit.Assert.*; public class HdfsTest { Scanner sc = new Scanner(System.in); FileSystem fs = HdfsDemo.getHadoopFileSystem(); @Test public void myCreatePath() { //目录创建测试 String path = "/usr/test/input"; System.out.println(HdfsDemo.myCreatePath(fs,path)); } @Test public void myDropHdfsPath() { // 目录删除 String path = "/usr/test/output"; System.out.println(HdfsDemo.myDropHdfsPath(fs,path)); } @Test public void myRename() { //文件重命名 String oldName = "/usr/test/input"; String newName = "/usr/test/renameInput"; System.out.println(HdfsDemo.myRename(fs,oldName,newName)); } @Test public void recursiveHdfsPath() { //遍历文件夹 Path path = new Path("/usr/test/"); Setset = HdfsDemo.recursiveHdfsPath(fs, path); for (String str : set) { System.out.println(str); } } @Test public void myCheck() { //文件简单的判断 是否存在 是否是文件夹 是否是文件 String path = "/usr/test/input/file.txt"; HdfsDemo.myCheck(fs,path); } @Test public void showAllConf() { //获取配置的所有信息 HdfsDemo.showAllConf(); } @Test public void getFileFromHDFS() { //文件下载 文件下载有权限要求 要有写的权限 0644error String dfsFile = "/usr/test/input/file.txt"; String locPath = "e://temp/data/"; HdfsDemo.getFileFromHDFS(fs,dfsFile,locPath); } @Test public void myPutFile2HDFS() { //文件的上传 String localFile = "e://temp/file.txt"; String dfsPath = "/usr/test/input"; HdfsDemo.myPutFile2HDFS(fs,localFile,dfsPath); } @Test public void copyFileBetweenHDFS() { //hdfs之间文件的复制 String in = "/usr/test/output"; String out = "/usr/test/input"; HdfsDemo.copyFileBetweenHDFS(fs,in,out); } }