热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kettle中添加Kerberos认证

目录一、背景介绍二、涉及kerberos认证的步骤1、JS下载文件2、JS重命名HDFS文件3、文件移动三、代码1、编写类HDFSProcess2、编写类HDFSUti

 目录

一、背景介绍 

二、涉及kerberos 认证的步骤

 1、JS 下载文件 

 2、JS 重命名HDFS文件

3、文件移动

三、代码

1、编写类HDFSProcess

2、编写类HDFSUtil

3、编写类 KettleKerberosUtils

4、编写类KerberosUtil

5、编写工具类PropertyUtils




一、背景介绍 


在 这篇使用 Kettle解析HDFS文件 同步到数据库文章中 Kettle实现 HDFS文件解析同步到SQLServer数据库(ETL 包括:时间格式化、IP校验、字段拼接)_开着拖拉机回家的博客-CSDN博客_hdfs同步数据库,重点讲了 作业的设计思路 和 Kettle中 用到的组件,在最终将作业设计完成之后做了Kerberos认证,不让HDFS裸奔。



Kerberos客户端支持两种认证方式,一是使用 principal + Password,二是使用 principal + keytab,前者适合用户进行交互式应用,例如hadoop fs -ls这种,后者适合服务,例如yarn的rm、nm等

 对于 HDFS的访问认证,我们使用了 principal + keytab



二、涉及kerberos 认证的步骤


 1、JS 下载文件 


此处的JS 代码 需要添加Kerberos认证的代码,在完成  Kerberos 认证之后 就可以下载 HDFS文件到本地了。



代码里面的文件目录随便写的


// HDFS 目录文件下要匹配的文件
var regexFile="^DataServerCloud\\.*.*[0-9]{12}\\.txt\\.*[0-9]{13}$"; // HDFS文件目录
var srcFloder="/hadoop/org/ReportInfo/"; // 下载到 linux 本地文件
var targetFloder="/hadoop/software/download_tmp/";// 要认证的主体
var user = "hdfs-kanna@kanna.COM";// 主体生成密钥文件 命令实现 kadmin.local -q "xst -k /etc/security/keytabs/kang.keytab kang@WINNER.COM"
var keytab = "/etc/security/keytabs/hdfs.headless.keytab";// KDC admin_server 的配置信息
var krb5 = "/etc/krb5.conf"// Kerberos 工具类 后面 会有代码
pp = new com.dataplat.utils.KettleKerberosUtils(user,keytab,krb5);// 调用具体的方法 完成认证
pp.auth();// 文件下载
var re = new Packages.com.kangna.datacenter.hdfs.HdfsProcess(regexFile,srcFloder,targetFloder);re.downloadFile();true;

 2、JS 重命名HDFS文件


跟前面 JS 下载 HDFS文件  基本一样 ,这块多了一个 封装的 rename 方法,代码后面有



3、文件移动


kinit -kt 就是使用生成的密钥文件对 实例进行认证



kinit -kt /etc/security/keytabs/hdfs.headless.keytab hdfs-kangna@kangna.COM
hadoop fs -mv /hadoop/org/Report/*.completed /hadoop/org/Report_bak/

三、代码


1、编写类HDFSProcess


此类主要实现是 HDFS 文件下载到本地,还有文件的重命名。


package com.kangna.hdfs;import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.*;
import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;/*** @AUTHOR kangll* @DATE 2020/6/20 10:16* @DESC: HDFS 工具类*/
public class HDFSProcess {private static final Logger logger = LoggerFactory.getLogger(HDFSProcess.class);private String regex;private String srcHDFSFolder;private String destinationFolder;private FileSystem fileSystem;public HDFSProcess() {}public HDFSProcess(String regex, String srcHDFSFolder, String destinationFolder) {this.regex = regex.toLowerCase();this.srcHDFSFolder = srcHDFSFolder;this.destinatiOnFolder= destinationFolder;}/*** 下载文件到本地*/public void downloadFile() throws Exception {// 提供配置参数对象Configuration cOnf= new Configuration();// 获取文件的状态List

paths = HDFSUtil.listStatus(conf, this.srcHDFSFolder, this.regex);logger.info("HDFS matched " + paths.size() + " files");for (Path path : paths) {String fileName = path.getName();fileSystem = FileSystem.get(conf);// 比如: /hadoop/kangna/odsPath srcPath = new Path(this.srcHDFSFolder + File.separator + fileName);// 比如:G:\\API\\String localPath = this.destinationFolder + File.separator + fileName;FSDataInputStream dataInputStream = fileSystem.open(srcPath);BufferedReader reader = new BufferedReader(new InputStreamReader(dataInputStream));BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localPath)));String line = null;while ((line = reader.readLine()) != null) {writer.write(line + "\n");}reader.close();writer.close();}logger.info("download file successful.");}/*** 文件重命名** @param localPath 本地文件路径* @param HDFSPath HDFS文件路径* @param suffix 后缀*/public void rename(String localPath, String HDFSPath, String suffix) throws Exception{Configuration cOnf= new Configuration();// 获取本地文件名的集合Set fileNameList = getFileLocalName(localPath);logger.info("match local file size : " + fileNameList.size());System.out.println("matche local file size : " + fileNameList.size());for (String fileName : fileNameList) {String src = HDFSPath + File.separator + fileName;// 比如: 将 kangna.txt 重命名 为 kangna.txt.completedString target = HDFSPath + File.separator + fileName + suffix;// 重命名 实现 调用 renameToboolean flag = HDFSUtil.renameTo(conf, src, target);if (flag) {System.out.println("renamed file " + src + " to " + target + "successful.");} else {System.out.println("renamed file " + src + " to " + target + "failed.");}System.out.println(fileName);}}public Set getFileLocalName(String localPath) {System.out.println("listing path is : " + localPath);// 将正则编译为此类的实例Pattern pattern = Pattern.compile(this.regex);Set list = Sets.newHashSet();File baseFile = new File(localPath);if ((baseFile.isFile()) || (!baseFile.exists())) {return list;}// 返回文件的抽象路径名数组File[] files = baseFile.listFiles();System.out.println("this path has files : " + files.length);for (File file : files) {if (file.isDirectory()) {list.addAll(getFileLocalName(file.getAbsolutePath()));} else {list.add(file.getName());}}return list;}public static void main(String[] args) throws Exception {System.out.println(new HDFSProcess());}
}

2、编写类HDFSUtil


package com.kangna.hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;/*** @AUTHOR kangll* @DATE 2020/6/20 10:31* @DESC:*/
public class HDFSUtil {private static final Logger logger = LoggerFactory.getLogger(HDFSUtil.class);// 文件执行中的标记private static final String FILE_EXECUTING_MARK = ".executing";// 文件执行后的标记private static final String FILE_COMPLETED_MARK = ".completed";private static FileSystem fileSystem;/*** 获取文件执行的状态** @param conf* @param dirPath* @return*/public static List

listStatus(Configuration conf, String dirPath) throws Exception {return listStatus(conf, dirPath, "");}/*** 获取文件执行的状态** @param conf 配置参数* @param dirPath 文件路径* @param regex 要匹配的正则* @return*/public static List

listStatus(Configuration conf, String dirPath, String regex) throws IOException {List

files = new ArrayList<>();try {// 返回配置过的文件系统对象fileSystem = FileSystem.get(conf);// 列出指定路径中文件的状态和块位置RemoteIterator fileStatus = fileSystem.listFiles(new Path(dirPath), true);Pattern pattern = Pattern.compile(regex);while (fileStatus.hasNext()) {LocatedFileStatus file = fileStatus.next();Path path = file.getPath();String fileName = path.getName().toLowerCase();if (regex.equals("")) {files.add(path);logger.info("match file : " + fileName);} else if (pattern.matcher(fileName).matches()) {files.add(path);logger.info("match file : " + fileName);}}} finally {if (fileSystem != null) {fileSystem.close();}}return files;}/*** 创建文件夹** @param conf* @param dir* @return*/public static boolean mkdir(Configuration conf, String dir) throws IOException {boolean flag = false;try {fileSystem.get(conf);if (fileSystem.exists(new Path(dir))) {flag = true;} else {fileSystem.mkdirs(new Path(dir));}} finally {if (fileSystem != null) {fileSystem.close();}}return flag;}/*** @param conf* @param src 要重命名的 src 路径* @param target 重新命名的 target 路径* @return* @throws Exception*/public static boolean renameTo(Configuration conf, String src, String target) throws Exception {boolean flag = false;try {fileSystem = FileSystem.get(conf);if (fileSystem.exists(new Path(target))) {fileSystem.delete(new Path(target), true);logger.info("target file : " + target + " exist, deleted");}// 将路径名 src 重命名为 路径targetflag = fileSystem.rename(new Path(src), new Path(target));if (flag) {logger.info("renamed file " + src + " to " + target + " success!");} else {logger.info("renamed file " + src + " to " + target + " failed!");}} finally {if (fileSystem != null) {fileSystem.close();}}return flag;}
}

3、编写类 KettleKerberosUtils


实例化对象传入 三个参数,就可以完成Kerberos认证


package com.kangna.kerberos;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;import java.io.IOException;/*** @AUTHOR kangll* @DATE 2020/6/27 14:07* @DESC:*/
public class KettleKerberosUtils {private String user;private String keytable;private String krb5;public KettleKerberosUtils(String user, String keytable, String krb5) {this.user = user;this.keytable = keytable;this.krb5 = krb5;}public String getUser() {return user;}public void setUser(String user) {this.user = user;}public String getKeytable() {return keytable;}public void setKeytable(String keytable) {this.keytable = keytable;}public String getKrb5() {return krb5;}public void setKrb5(String krb5) {this.krb5 = krb5;}/**** UserGroupInformation .Hadoop的用户和组信息。这个类封装了一个JAAS主题,并提供了确定用户用户名和组的方法。* 它同时支持Windows、Unix和Kerberos登录模块。*/public void auth() {System.out.println(this.user);Configuration cOnf= new Configuration();System.setProperty("java.security.krb5.conf", this.krb5);UserGroupInformation.setConfiguration(conf);try {/*** loginUserFromKeytab(this.user, this.keytable)* (用户要从Keytab加载的主名称,keytab文件的路径)*/UserGroupInformation.loginUserFromKeytab(this.user, this.keytable);} catch (IOException e) {e.printStackTrace();}}}

4、编写类KerberosUtil


KerberosUtil 类是使用配置文件的形式进行认证


package com.kangna.kerberos;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;import java.io.IOException;
import java.util.Properties;/*** @AUTHOR kangll* @DATE 2020/6/27 14:07* @DESC:*/
public class KerberosUtil {public static void main(String[] args) {Properties prop = PropertyUtils.loadProperty("kerberos_user.properties");String user = prop.getProperty("kerberos.hdfs.user");String keytab = prop.getProperty("kerberos.hdfs.keytab");try {authKerberos(user, keytab);Configuration cOnf= new Configuration();FileSystem fs = FileSystem.get(conf);FileStatus[] files = fs.listStatus(new Path("/winhadoop"));for (FileStatus file : files) {System.out.println(file.getPath());}} catch (Exception e) {e.printStackTrace();}}/*** Kerberos 认证* @param user* @param keytab*/public static void authKerberos(String user, String keytab) {Configuration cOnf= new Configuration();System.setProperty("java.security.krb5.conf", PropertyUtils.loadProperty("kerberos_user.properties").getProperty("kerberos.krb5.path"));try {UserGroupInformation.loginUserFromKeytab(user, keytab);} catch (IOException e) {e.printStackTrace();}}
}

5、编写工具类PropertyUtils


配置文件的获取


package com.kangna.kerberos;import org.apache.log4j.Logger;import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;/*** @AUTHOR kangll* @DATE 2020/6/27 14:47* @DESC:加载配置文件工具类*/
public class PropertyUtils {private static final Logger logger = Logger.getLogger(PropertyUtils.class);/*** 类加载根据文件名加载 配置文件* @param fileName* @param loader 类加载器* @return*/public static Properties loadPropertyFileWithClassLoader(String fileName, ClassLoader loader) {Properties prop = new Properties();URL commOnFileUrl= loader.getResource(fileName);try {prop.load(commonFileUrl.openStream());} catch (IOException e) {logger.error("Can&#39;t load configuration file : " + fileName);logger.error(e.getMessage(), e);}return prop;}/*** 根据 URL 加载配置文件* @param fileName* @return*/public static Properties loadPropertyFileWithURL(String fileName) {Properties prop = new Properties();try {prop.load(new FileInputStream(fileName));} catch (IOException e) {logger.error("Can&#39;t load configuration file : " + fileName);logger.error(e.getMessage(), e);}return prop;}/**** @param fileName* @return*/public static Properties loadProperty(String fileName) {Properties prop = new Properties();try {prop.load(ClassLoader.getSystemResourceAsStream(fileName));} catch (IOException e) {e.printStackTrace();}return prop;}
}

 


--------------------------- 感谢点赞!---------------------------------


推荐阅读
  • 如何在Java中使用DButils类
    这期内容当中小编将会给大家带来有关如何在Java中使用DButils类,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。D ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文详细介绍了 InfluxDB、collectd 和 Grafana 的安装与配置流程。首先,按照启动顺序依次安装并配置 InfluxDB、collectd 和 Grafana。InfluxDB 作为时序数据库,用于存储时间序列数据;collectd 负责数据的采集与传输;Grafana 则用于数据的可视化展示。文中提供了 collectd 的官方文档链接,便于用户参考和进一步了解其配置选项。通过本指南,读者可以轻松搭建一个高效的数据监控系统。 ... [详细]
  • 本文介绍了如何利用Shell脚本高效地部署MHA(MySQL High Availability)高可用集群。通过详细的脚本编写和配置示例,展示了自动化部署过程中的关键步骤和注意事项。该方法不仅简化了集群的部署流程,还提高了系统的稳定性和可用性。 ... [详细]
  • 如何优化MySQL数据库性能以提升查询效率和系统稳定性 ... [详细]
  • Web开发框架概览:Java与JavaScript技术及框架综述
    Web开发涉及服务器端和客户端的协同工作。在服务器端,Java是一种优秀的编程语言,适用于构建各种功能模块,如通过Servlet实现特定服务。客户端则主要依赖HTML进行内容展示,同时借助JavaScript增强交互性和动态效果。此外,现代Web开发还广泛使用各种框架和库,如Spring Boot、React和Vue.js,以提高开发效率和应用性能。 ... [详细]
  • WPF项目学习.一
    WPF项目搭建版权声明:本文为博主初学经验,未经博主允许不得转载。一、前言记录在学习与制作WPF过程中遇到的解决方案。使用MVVM的优点是数据和视图分离,双向绑定,低耦合,可重用行 ... [详细]
  • MongoDB核心概念详解
    本文介绍了NoSQL数据库的概念及其应用场景,重点解析了MongoDB的基本特性、数据结构以及常用操作。MongoDB是一个高性能、高可用且易于扩展的文档数据库系统。 ... [详细]
  • Nacos 0.3 数据持久化详解与实践
    本文详细介绍了如何将 Nacos 0.3 的数据持久化到 MySQL 数据库,并提供了具体的步骤和注意事项。 ... [详细]
  • 包含phppdoerrorcode的词条 ... [详细]
  • 一个建表一个执行crud操作建表代码importandroid.content.Context;importandroid.database.sqlite.SQLiteDat ... [详细]
  • Spring Data JdbcTemplate 入门指南
    本文将介绍如何使用 Spring JdbcTemplate 进行数据库操作,包括查询和插入数据。我们将通过一个学生表的示例来演示具体步骤。 ... [详细]
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 本文介绍如何在将数据库从服务器复制到本地时,处理因外键约束导致的数据插入失败问题。 ... [详细]
  • 在 Ubuntu 中遇到 Samba 服务器故障时,尝试卸载并重新安装 Samba 发现配置文件未重新生成。本文介绍了解决该问题的方法。 ... [详细]
author-avatar
奇力0_843
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有