热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

分布式分析引擎KylinSpringDataSource封装

Kylin概述ApacheKylin™是一个开源的分布式分析引擎,提供HadoopSpark之上的SQL查询接口及多维分析(OLAP)能力
Kylin 概述

Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc. 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。

5401760-395d2f2a0233d332.png
image.png
Kylin 特性
5401760-9e470d0dad6d0e95.png
image.png
JDBC

认证

基于Apache Kylin认证RESTFUL服务。支持的参数:

user : 用户名
password : 密码
ssl: true或false。 默认为flas;如果为true,所有的服务调用都会使用https。

连接url格式:

jdbc:kylin://:/

如果“ssl”为true,“port”应该是Kylin server的HTTPS端口。
如果“port”未被指定,driver会使用默认的端口:HTTP 80,HTTPS 443。
必须指定“kylin_project_name”并且用户需要确保它在Kylin server上存在。

1. 使用Statement查询

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");while (resultSet.next()) {assertEquals("foo", resultSet.getString(1));assertEquals("bar", resultSet.getString(2));assertEquals("tool", resultSet.getString(3));
}

2. 使用PreparedStatementv查询

支持的PreparedStatement参数:
setString
setInt
setShort
setLong
setFloat
setDouble
setBoolean
setByte
setDate
setTime
setTimestamp

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
PreparedStatement state = conn.prepareStatement("select * from test_table where id=?");
state.setInt(1, 10);
ResultSet resultSet = state.executeQuery();while (resultSet.next()) {assertEquals("foo", resultSet.getString(1));assertEquals("bar", resultSet.getString(2));assertEquals("tool", resultSet.getString(3));
}

3. 获取查询结果元数据

Kylin jdbc driver支持元数据列表方法:
通过sql模式过滤器(比如 %)列出catalog、schema、table和column。

Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");ResultSet tables = conn.getMetaData().getTables(null, null, "dummy", null);
while (tables.next()) {for (int i &#61; 0; i <10; i&#43;&#43;) {assertEquals("dummy", tables.getString(i &#43; 1));}
}
Spring DataSource封装

JDBC方式在开发使用中十分不便&#xff0c;而如果能封装为Spring 提供的DataSource方式&#xff0c;使用过程中就会便捷很多。

创建SqlProperties&#xff0c;封装jdbc连接的参数

&#64;Data
public class KylinSqlProperties {private static final String DEFAULT_DRIVER_CLASS_NAME &#61; "org.apache.kylin.jdbc.Driver";private static final int DEFAULT_POOL_SIZE &#61; 10;private static final Long DEFAULT_MAX_WAIT_TIME &#61; 10000L;/*** 用户名*/private String userName;/*** 密码*/private String password;/*** 是否加密*/private boolean decrypt;/*** 主库连接地址*/private String connectionUrl;/*** 最长等待连接时间*/private long maxWaitTime &#61; DEFAULT_MAX_WAIT_TIME;private int poolSize &#61; DEFAULT_POOL_SIZE;private String driverClassName &#61; DEFAULT_DRIVER_CLASS_NAME;
}

实现 DataSource 接口&#xff0c;创建连接池

&#64;Slf4j
public class KylinDataSource implements DataSource {private LinkedList connectionPoolList &#61; new LinkedList<>();private long maxWaitTime;public KylinDataSource(KylinSqlProperties sqlProperties) {try {this.maxWaitTime &#61; sqlProperties.getMaxWaitTime();Driver driverManager &#61; (Driver) Class.forName(sqlProperties.getDriverClassName()).newInstance();Properties info &#61; new Properties();info.put("user", sqlProperties.getUserName());info.put("password", sqlProperties.getPassword());for (int i &#61; 0; i T unwrap(Class iface) throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic boolean isWrapperFor(Class iface) throws SQLException {return DataSource.class.equals(iface);}&#64;Overridepublic PrintWriter getLogWriter() throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic void setLogWriter(PrintWriter out) throws SQLException {}&#64;Overridepublic void setLoginTimeout(int seconds) throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic int getLoginTimeout() throws SQLException {return 0;}&#64;Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return null;}static class ConnectionProxy implements InvocationHandler {private Object obj;private LinkedList pool;private String DEFAULT_CLOSE_METHOD &#61; "close";private ConnectionProxy(Object obj, LinkedList pool) {this.obj &#61; obj;this.pool &#61; pool;}public static Connection getProxy(Object o, LinkedList pool) {Object proxed &#61; Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[]{Connection.class},new ConnectionProxy(o, pool));return (Connection) proxed;}&#64;Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getName().equals(DEFAULT_CLOSE_METHOD)) {synchronized (pool) {pool.add((Connection) proxy);pool.notify();}return null;} else {return method.invoke(obj, args);}}}

创建JdbcPoolConfiguration类&#xff0c;注册template bean

&#64;Slf4j
&#64;Configuration
&#64;Component
public class KylinJdbcPoolConfiguration implements BeanFactoryPostProcessor, EnvironmentAware {private ConfigurableEnvironment environment;&#64;Value("${kylin.decrypt}")private boolean decrypt &#61; false;private final static String prefixName &#61; "kylin";&#64;Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {KylinSqlProperties properties &#61; new KylinSqlProperties();properties.setUserName("xxxxx");properties.setPassword("xxxx");properties.setConnectionUrl("xxxx");properties.setDecrypt(decrypt);createDataSourceBean(beanFactory, properties);}public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,KylinSqlProperties sqlProperties) {DataSource baseDataSource &#61; new KylinDataSource(sqlProperties);register(beanFactory, new JdbcTemplate(baseDataSource), prefixName &#43; "JdbcTemplateFactory", prefixName);}private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,String alias) {beanFactory.registerSingleton(name, bean);if (!beanFactory.containsSingleton(alias)) {beanFactory.registerAlias(name, alias);}}&#64;Overridepublic void setEnvironment(Environment environment) {this.environment &#61; (ConfigurableEnvironment) environment;}}

RowMapper实现


public class CommonBeanPropertyRowMapper implements RowMapper {protected final Log logger &#61; LogFactory.getLog(this.getClass());private Class mappedClass;private boolean checkFullyPopulated &#61; false;private boolean primitivesDefaultedForNullValue &#61; false;private ConversionService conversionService &#61; DefaultConversionService.getSharedInstance();private Map mappedFields;private Set mappedProperties;public CommonBeanPropertyRowMapper() {}public CommonBeanPropertyRowMapper(Class mappedClass) throws Exception {this.initialize(mappedClass);}public CommonBeanPropertyRowMapper(Class mappedClass, boolean checkFullyPopulated)throws Exception {this.initialize(mappedClass);this.checkFullyPopulated &#61; checkFullyPopulated;}public void setMappedClass(Class mappedClass) throws Exception {if (this.mappedClass &#61;&#61; null) {this.initialize(mappedClass);} else if (this.mappedClass !&#61; mappedClass) {throw new InvalidDataAccessApiUsageException("The mapped class can not be reassigned to map to " &#43; mappedClass&#43; " since it is already providing mapping for " &#43; this.mappedClass);}}public final Class getMappedClass() {return this.mappedClass;}public void setCheckFullyPopulated(boolean checkFullyPopulated) {this.checkFullyPopulated &#61; checkFullyPopulated;}public boolean isCheckFullyPopulated() {return this.checkFullyPopulated;}public void setPrimitivesDefaultedForNullValue(boolean primitivesDefaultedForNullValue) {this.primitivesDefaultedForNullValue &#61; primitivesDefaultedForNullValue;}public boolean isPrimitivesDefaultedForNullValue() {return this.primitivesDefaultedForNullValue;}public void setConversionService(ConversionService conversionService) {this.conversionService &#61; conversionService;}public ConversionService getConversionService() {return this.conversionService;}protected void initialize(Class mappedClass) throws Exception {this.mappedClass &#61; mappedClass;this.mappedFields &#61; new HashMap();this.mappedProperties &#61; new HashSet();PropertyDescriptor[] pds &#61; BeanUtils.getPropertyDescriptors(mappedClass);PropertyDescriptor[] var3 &#61; pds;int var4 &#61; pds.length;for (int var5 &#61; 0; var5 org.springframework.jdbc.core.BeanPropertyRowMapper newInstance(Class mappedClass) {return new org.springframework.jdbc.core.BeanPropertyRowMapper(mappedClass);}
}

RowMapper子类


public class RowMapper extends CommonBeanPropertyRowMapper {private List mapperPlugins;private RowMapper(Class tClass, List mapperPlugins) throws Exception {super(tClass);this.mapperPlugins &#61; mapperPlugins;}&#64;Overrideprotected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)throws SQLException {Object object &#61; rs.getObject(index);return mapperPlugins.stream().filter(mapperPlugin -> mapperPlugin.test(pd)).map(mapperPlugin -> mapperPlugin.getColumnValue(object, pd)).findFirst().orElse(super.getColumnValue(rs, index, pd));}public static RowMapper getDefault(Class tClass) {return RowMapper.builder().tClass(tClass).mapperPlugins(JSONObjectPlugin).mapperPlugins(ListPlugin).mapperPlugins(SetPlugin).mapperPlugins(MapPlugin).mapperPlugins(EnumPlugin).mapperPlugins(JsonPlugin).build();}public static RowMapper withDefault(Class tClass, MapperPlugin... mapperPlugins) {RhllorRowMapperBuilder builder &#61; RowMapper.builder().tClass(tClass);for (final MapperPlugin mapperPlugin : mapperPlugins) {builder.mapperPlugins(mapperPlugin);}return builder.mapperPlugins(JSONObjectPlugin).mapperPlugins(ListPlugin).mapperPlugins(SetPlugin).mapperPlugins(MapPlugin).mapperPlugins(EnumPlugin).mapperPlugins(JsonPlugin).build();}public static RowMapper.RhllorRowMapperBuilder builder() {return new RowMapper.RhllorRowMapperBuilder<>();}public static class RhllorRowMapperBuilder {private Class tClass;private ArrayList mapperPlugins;RhllorRowMapperBuilder() {}public RowMapper.RhllorRowMapperBuilder tClass(Class tClass) {this.tClass &#61; tClass;return this;}public RowMapper.RhllorRowMapperBuilder mapperPlugins(MapperPlugin mapperPlugin) {if (this.mapperPlugins &#61;&#61; null) {this.mapperPlugins &#61; new ArrayList();}this.mapperPlugins.add(mapperPlugin);return this;}public RowMapper build() {List mapperPlugins;switch (this.mapperPlugins &#61;&#61; null ? 0 : this.mapperPlugins.size()) {case 0:mapperPlugins &#61; Collections.emptyList();break;case 1:mapperPlugins &#61; Collections.singletonList(this.mapperPlugins.get(0));break;default:mapperPlugins &#61; Collections.unmodifiableList(new ArrayList<>(this.mapperPlugins));}try {return new RowMapper<>(this.tClass, mapperPlugins);} catch (Exception ex) {ex.printStackTrace();}return null;}&#64;Overridepublic String toString() {return "PrestoRowMapper.KylinRowMapperBuilder(tClass&#61;" &#43; this.tClass &#43; ", mapperPlugins&#61;"&#43; this.mapperPlugins &#43; ")";}}}

MapperPlugin


public class MapperPlugin {private static final Function bytes2UTF8String &#61;bytes -> bytes instanceof String ? bytes.toString() :new String((byte[]) bytes, Charset.forName("UTF-8"));private static final Function pd2Generic &#61;pd -> getCollectionGeneric(pd.getReadMethod());private final Predicate predicate;private final ColumnValue columnValue;private MapperPlugin(Predicate predicate,ColumnValue columnValue) {this.predicate &#61; predicate;this.columnValue &#61; columnValue;}boolean test(PropertyDescriptor pd) {return predicate.test(pd);}Object getColumnValue(Object object, PropertyDescriptor pd) {return columnValue.get(object, pd);}public static MapperPluginsBuilder of(Predicate predicate) {return new MapperPluginsBuilder(predicate);}public static MapperPluginsBuilder ofNot(Predicate predicate) {return of(predicate.negate());}public static MapperPluginsBuilder of(Class clazz) {return of(pd -> clazz.isAssignableFrom(pd.getPropertyType()));}&#64;FunctionalInterfacepublic interface ColumnValue {Object get(Object object, PropertyDescriptor pd);}public static class MapperPluginsBuilder {Predicate predicate;public MapperPluginsBuilder(Predicate predicate) {this.predicate &#61; predicate;}public MapperPlugin columnValue(ColumnValue columnValue) {return new MapperPlugin(predicate, columnValue);}}static final MapperPlugin JsonPlugin &#61;MapperPlugin.ofNot(pd -> pd.getPropertyType().isPrimitive() ||Primitives.isWrapperType(pd.getPropertyType()) ||String.class.isAssignableFrom(pd.getPropertyType()) ||Date.class.isAssignableFrom(pd.getPropertyType())).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseObject(json, pd.getPropertyType())).orElse(null));static final MapperPlugin JSONObjectPlugin &#61;MapperPlugin.of(JSONObject.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(JSONObject::parseObject).orElse(new JSONObject()));static final MapperPlugin ListPlugin &#61;MapperPlugin.of(List.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseArray(json, pd2Generic.apply(pd))).orElse(new ArrayList<>()));static final MapperPlugin SetPlugin &#61;MapperPlugin.of(Set.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseArray(json, pd2Generic.apply(pd))).map(list -> Sets.newHashSet(List.class.cast(list))).orElse(new HashSet<>()));static final MapperPlugin MapPlugin &#61;MapperPlugin.of(Map.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSONObject.parseObject(json, Map.class)).orElse(new HashMap<>()));static final MapperPlugin EnumPlugin &#61;MapperPlugin.of(Enum.class).columnValue((o, pd) -> {try {if (o &#61;&#61; null) {return null;}if (o instanceof Number) {Number number &#61; (Number) o;Method method &#61; pd.getPropertyType().getMethod("valueByIndex", Integer.TYPE);return method.invoke(null, number.intValue());} else {String val &#61; o.toString();Method method &#61; pd.getPropertyType().getMethod("fromString", String.class);return method.invoke(null, val);}} catch (NoSuchMethodException e) {throw new RuntimeException("getColumnValue error, NoSuchMethod : valueByIndex or fromString", e);} catch (InvocationTargetException e) {throw new RuntimeException("getColumnValue error, InvocationTargetException ", e);} catch (IllegalAccessException e) {throw new RuntimeException("getColumnValue error, IllegalAccessException ", e);}});private static Class getCollectionGeneric(Method method) {if (Collection.class.isAssignableFrom(method.getReturnType())) {Type fc &#61; method.getGenericReturnType();if (fc &#61;&#61; null) {return Object.class;}if (fc instanceof ParameterizedType) {ParameterizedType pt &#61; (ParameterizedType) fc;return (Class) pt.getActualTypeArguments()[0];}return Object.class;}return Object.class;}}

具体使用


&#64;Component
&#64;Log4j2
public class MetricDaoImpl {&#64;Resource(name &#61; "kylinJdbcTemplateFactory")private JdbcTemplate kylinJdbcTemplate;public List getDistinctIds() {StringBuilder sqlBuilder &#61; new StringBuilder().append(" select * ").append(" from LOG_DID_VIEW ").append(" ORDER BY DT ,total DESC limit 1000");log.info(sqlBuilder);return kylinJdbcTemplate.query(sqlBuilder.toString(), RowMapper.getDefault(TotalModelMetricEntity.class));}

综上我们就完成了对Kylin JDBC的封装&#xff0c;同样的如Presto等其他支持JDBC的查询引擎封装方式类似。


欢迎关注 高广超的简书博客 与 收藏文章 &#xff01;
欢迎关注 头条号&#xff1a;互联网技术栈 &#xff01;

个人介绍&#xff1a;

高广超&#xff1a;多年一线互联网研发与架构设计经验&#xff0c;擅长设计与落地高可用、高性能、可扩展的互联网架构。目前从事大数据相关研发与架构工作。

本文首发在 高广超的简书博客 转载请注明&#xff01;



推荐阅读
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • CentOS 7配置SSH远程访问及控制
    nsitionalENhttp:www.w3.orgTRxhtml1DTDxhtml1-transitional.dtd ... [详细]
  • 《Spark核心技术与高级应用》——1.2节Spark的重要扩展
    本节书摘来自华章社区《Spark核心技术与高级应用》一书中的第1章,第1.2节Spark的重要扩展,作者于俊向海代其锋马海平,更多章节内容可以访问云栖社区“华章社区”公众号查看1. ... [详细]
  • 本文介绍了在Windows环境下如何配置php+apache环境,包括下载php7和apache2.4、安装vc2015运行时环境、启动php7和apache2.4等步骤。希望对需要搭建php7环境的读者有一定的参考价值。摘要长度为169字。 ... [详细]
  • 本文介绍了一些Java开发项目管理工具及其配置教程,包括团队协同工具worktil,版本管理工具GitLab,自动化构建工具Jenkins,项目管理工具Maven和Maven私服Nexus,以及Mybatis的安装和代码自动生成工具。提供了相关链接供读者参考。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文总结了初学者在使用dubbo设计架构过程中遇到的问题,并提供了相应的解决方法。问题包括传输字节流限制、分布式事务、序列化、多点部署、zk端口冲突、服务失败请求3次机制以及启动时检查。通过解决这些问题,初学者能够更好地理解和应用dubbo设计架构。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  • 11月26日,由中国计算机协会(CCF)主办,CCF大数据专家委员会协办,CSDN承办的Hadoop与大数据技术大会(Hadoop&BigDataTechnology ... [详细]
  • ZooKeeper 学习
    前言相信大家对ZooKeeper应该不算陌生。但是你真的了解ZooKeeper是个什么东西吗?如果别人面试官让你给他讲讲ZooKeeper是个什么东西, ... [详细]
author-avatar
他乡绿树_762
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有