目录
一、背景介绍
二、涉及kerberos 认证的步骤
1、JS 下载文件
2、JS 重命名HDFS文件
3、文件移动
三、代码
1、编写类HDFSProcess
2、编写类HDFSUtil
3、编写类 KettleKerberosUtils
4、编写类KerberosUtil
5、编写工具类PropertyUtils
在 这篇使用 Kettle解析HDFS文件 同步到数据库文章中 Kettle实现 HDFS文件解析同步到SQLServer数据库(ETL 包括:时间格式化、IP校验、字段拼接)_开着拖拉机回家的博客-CSDN博客_hdfs同步数据库,重点讲了 作业的设计思路 和 Kettle中 用到的组件,在最终将作业设计完成之后做了Kerberos认证,不让HDFS裸奔。
Kerberos客户端支持两种认证方式,一是使用 principal + Password,二是使用 principal + keytab,前者适合用户进行交互式应用,例如hadoop fs -ls这种,后者适合服务,例如yarn的rm、nm等
对于 HDFS的访问认证,我们使用了 principal + keytab
此处的JS 代码 需要添加Kerberos认证的代码,在完成 Kerberos 认证之后 就可以下载 HDFS文件到本地了。
代码里面的文件目录随便写的
// HDFS 目录文件下要匹配的文件
var regexFile="^DataServerCloud\\.*.*[0-9]{12}\\.txt\\.*[0-9]{13}$"; // HDFS文件目录
var srcFloder="/hadoop/org/ReportInfo/"; // 下载到 linux 本地文件
var targetFloder="/hadoop/software/download_tmp/";// 要认证的主体
var user = "hdfs-kanna@kanna.COM";// 主体生成密钥文件 命令实现 kadmin.local -q "xst -k /etc/security/keytabs/kang.keytab kang@WINNER.COM"
var keytab = "/etc/security/keytabs/hdfs.headless.keytab";// KDC admin_server 的配置信息
var krb5 = "/etc/krb5.conf"// Kerberos 工具类 后面 会有代码
pp = new com.dataplat.utils.KettleKerberosUtils(user,keytab,krb5);// 调用具体的方法 完成认证
pp.auth();// 文件下载
var re = new Packages.com.kangna.datacenter.hdfs.HdfsProcess(regexFile,srcFloder,targetFloder);re.downloadFile();true;
跟前面 JS 下载 HDFS文件 基本一样 ,这块多了一个 封装的 rename 方法,代码后面有
kinit -kt 就是使用生成的密钥文件对 实例进行认证
kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-kangna@kangna.COM
hadoop fs -mv /hadoop/org/Report/*.completed /hadoop/org/Report_bak/
此类主要实现是 HDFS 文件下载到本地,还有文件的重命名。
paths = HDFSUtil.listStatus(conf, this.srcHDFSFolder, this.regex);logger.info("HDFS matched " + paths.size() + " files");for (Path path : paths) {String fileName = path.getName();fileSystem = FileSystem.get(conf);// 比如: /hadoop/kangna/odsPath srcPath = new Path(this.srcHDFSFolder + File.separator + fileName);// 比如:G:\\API\\String localPath = this.destinationFolder + File.separator + fileName;FSDataInputStream dataInputStream = fileSystem.open(srcPath);BufferedReader reader = new BufferedReader(new InputStreamReader(dataInputStream));BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localPath)));String line = null;while ((line = reader.readLine()) != null) {writer.write(line + "\n");}reader.close();writer.close();}logger.info("download file successful.");}/*** 文件重命名** @param localPath 本地文件路径* @param HDFSPath HDFS文件路径* @param suffix 后缀*/public void rename(String localPath, String HDFSPath, String suffix) throws Exception{Configuration cOnf= new Configuration();// 获取本地文件名的集合Setpackage com.kangna.hdfs;import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.*;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;/*** @AUTHOR kangll* @DATE 2020/6/20 10:16* @DESC: HDFS 工具类*/
public class HDFSProcess {private static final Logger logger = LoggerFactory.getLogger(HDFSProcess.class);private String regex;private String srcHDFSFolder;private String destinationFolder;private FileSystem fileSystem;public HDFSProcess() {}public HDFSProcess(String regex, String srcHDFSFolder, String destinationFolder) {this.regex = regex.toLowerCase();this.srcHDFSFolder = srcHDFSFolder;this.destinatiOnFolder= destinationFolder;}/*** 下载文件到本地*/public void downloadFile() throws Exception {// 提供配置参数对象Configuration cOnf= new Configuration();// 获取文件的状态List
}
listStatus(Configuration conf, String dirPath) throws Exception {return listStatus(conf, dirPath, "");}/*** 获取文件执行的状态** @param conf 配置参数* @param dirPath 文件路径* @param regex 要匹配的正则* @return*/public static List listStatus(Configuration conf, String dirPath, String regex) throws IOException {List files = new ArrayList<>();try {// 返回配置过的文件系统对象fileSystem = FileSystem.get(conf);// 列出指定路径中文件的状态和块位置RemoteIterator
package com.kangna.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;/*** @AUTHOR kangll* @DATE 2020/6/20 10:31* @DESC:*/
public class HDFSUtil {private static final Logger logger = LoggerFactory.getLogger(HDFSUtil.class);// 文件执行中的标记private static final String FILE_EXECUTING_MARK = ".executing";// 文件执行后的标记private static final String FILE_COMPLETED_MARK = ".completed";private static FileSystem fileSystem;/*** 获取文件执行的状态** @param conf* @param dirPath* @return*/public static List
}
实例化对象传入 三个参数,就可以完成Kerberos认证
package com.kangna.kerberos;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;import java.io.IOException;/*** @AUTHOR kangll* @DATE 2020/6/27 14:07* @DESC:*/
public class KettleKerberosUtils {private String user;private String keytable;private String krb5;public KettleKerberosUtils(String user, String keytable, String krb5) {this.user = user;this.keytable = keytable;this.krb5 = krb5;}public String getUser() {return user;}public void setUser(String user) {this.user = user;}public String getKeytable() {return keytable;}public void setKeytable(String keytable) {this.keytable = keytable;}public String getKrb5() {return krb5;}public void setKrb5(String krb5) {this.krb5 = krb5;}/**** UserGroupInformation .Hadoop的用户和组信息。这个类封装了一个JAAS主题,并提供了确定用户用户名和组的方法。* 它同时支持Windows、Unix和Kerberos登录模块。*/public void auth() {System.out.println(this.user);Configuration cOnf= new Configuration();System.setProperty("java.security.krb5.conf", this.krb5);UserGroupInformation.setConfiguration(conf);try {/*** loginUserFromKeytab(this.user, this.keytable)* (用户要从Keytab加载的主名称,keytab文件的路径)*/UserGroupInformation.loginUserFromKeytab(this.user, this.keytable);} catch (IOException e) {e.printStackTrace();}}}
KerberosUtil 类是使用配置文件的形式进行认证
package com.kangna.kerberos;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;import java.io.IOException;
import java.util.Properties;/*** @AUTHOR kangll* @DATE 2020/6/27 14:07* @DESC:*/
public class KerberosUtil {public static void main(String[] args) {Properties prop = PropertyUtils.loadProperty("kerberos_user.properties");String user = prop.getProperty("kerberos.hdfs.user");String keytab = prop.getProperty("kerberos.hdfs.keytab");try {authKerberos(user, keytab);Configuration cOnf= new Configuration();FileSystem fs = FileSystem.get(conf);FileStatus[] files = fs.listStatus(new Path("/winhadoop"));for (FileStatus file : files) {System.out.println(file.getPath());}} catch (Exception e) {e.printStackTrace();}}/*** Kerberos 认证* @param user* @param keytab*/public static void authKerberos(String user, String keytab) {Configuration cOnf= new Configuration();System.setProperty("java.security.krb5.conf", PropertyUtils.loadProperty("kerberos_user.properties").getProperty("kerberos.krb5.path"));try {UserGroupInformation.loginUserFromKeytab(user, keytab);} catch (IOException e) {e.printStackTrace();}}
}
配置文件的获取
package com.kangna.kerberos;import org.apache.log4j.Logger;import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;/*** @AUTHOR kangll* @DATE 2020/6/27 14:47* @DESC:加载配置文件工具类*/
public class PropertyUtils {private static final Logger logger = Logger.getLogger(PropertyUtils.class);/*** 类加载根据文件名加载 配置文件* @param fileName* @param loader 类加载器* @return*/public static Properties loadPropertyFileWithClassLoader(String fileName, ClassLoader loader) {Properties prop = new Properties();URL commOnFileUrl= loader.getResource(fileName);try {prop.load(commonFileUrl.openStream());} catch (IOException e) {logger.error("Can&#39;t load configuration file : " + fileName);logger.error(e.getMessage(), e);}return prop;}/*** 根据 URL 加载配置文件* @param fileName* @return*/public static Properties loadPropertyFileWithURL(String fileName) {Properties prop = new Properties();try {prop.load(new FileInputStream(fileName));} catch (IOException e) {logger.error("Can&#39;t load configuration file : " + fileName);logger.error(e.getMessage(), e);}return prop;}/**** @param fileName* @return*/public static Properties loadProperty(String fileName) {Properties prop = new Properties();try {prop.load(ClassLoader.getSystemResourceAsStream(fileName));} catch (IOException e) {e.printStackTrace();}return prop;}
}