Kylin 概述
Apache Kylin™是一个开源的分布式分析引擎,提供Hadoop/Spark之上的SQL查询接口及多维分析(OLAP)能力以支持超大规模数据,最初由eBay Inc. 开发并贡献至开源社区。它能在亚秒内查询巨大的Hive表。
Kylin 特性
JDBC
认证
基于Apache Kylin认证RESTFUL服务。支持的参数:
user : 用户名
password : 密码
ssl: true或false。 默认为flas;如果为true,所有的服务调用都会使用https。
连接url格式:
jdbc:kylin://:/
如果“ssl”为true,“port”应该是Kylin server的HTTPS端口。
如果“port”未被指定,driver会使用默认的端口:HTTP 80,HTTPS 443。
必须指定“kylin_project_name”并且用户需要确保它在Kylin server上存在。
1. 使用Statement查询
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");while (resultSet.next()) {assertEquals("foo", resultSet.getString(1));assertEquals("bar", resultSet.getString(2));assertEquals("tool", resultSet.getString(3));
}
2. 使用PreparedStatementv查询
支持的PreparedStatement参数:
setString
setInt
setShort
setLong
setFloat
setDouble
setBoolean
setByte
setDate
setTime
setTimestamp
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
PreparedStatement state = conn.prepareStatement("select * from test_table where id=?");
state.setInt(1, 10);
ResultSet resultSet = state.executeQuery();while (resultSet.next()) {assertEquals("foo", resultSet.getString(1));assertEquals("bar", resultSet.getString(2));assertEquals("tool", resultSet.getString(3));
}
3. 获取查询结果元数据
Kylin jdbc driver支持元数据列表方法:
通过sql模式过滤器(比如 %)列出catalog、schema、table和column。
Driver driver = (Driver) Class.forName("org.apache.kylin.jdbc.Driver").newInstance();
Properties info = new Properties();
info.put("user", "ADMIN");
info.put("password", "KYLIN");
Connection conn = driver.connect("jdbc:kylin://localhost:7070/kylin_project_name", info);
Statement state = conn.createStatement();
ResultSet resultSet = state.executeQuery("select * from test_table");ResultSet tables = conn.getMetaData().getTables(null, null, "dummy", null);
while (tables.next()) {for (int i &#61; 0; i <10; i&#43;&#43;) {assertEquals("dummy", tables.getString(i &#43; 1));}
}
Spring DataSource封装
JDBC方式在开发使用中十分不便&#xff0c;而如果能封装为Spring 提供的DataSource方式&#xff0c;使用过程中就会便捷很多。
创建SqlProperties&#xff0c;封装jdbc连接的参数
&#64;Data
public class KylinSqlProperties {private static final String DEFAULT_DRIVER_CLASS_NAME &#61; "org.apache.kylin.jdbc.Driver";private static final int DEFAULT_POOL_SIZE &#61; 10;private static final Long DEFAULT_MAX_WAIT_TIME &#61; 10000L;/*** 用户名*/private String userName;/*** 密码*/private String password;/*** 是否加密*/private boolean decrypt;/*** 主库连接地址*/private String connectionUrl;/*** 最长等待连接时间*/private long maxWaitTime &#61; DEFAULT_MAX_WAIT_TIME;private int poolSize &#61; DEFAULT_POOL_SIZE;private String driverClassName &#61; DEFAULT_DRIVER_CLASS_NAME;
}
实现 DataSource 接口&#xff0c;创建连接池
&#64;Slf4j
public class KylinDataSource implements DataSource {private LinkedList connectionPoolList &#61; new LinkedList<>();private long maxWaitTime;public KylinDataSource(KylinSqlProperties sqlProperties) {try {this.maxWaitTime &#61; sqlProperties.getMaxWaitTime();Driver driverManager &#61; (Driver) Class.forName(sqlProperties.getDriverClassName()).newInstance();Properties info &#61; new Properties();info.put("user", sqlProperties.getUserName());info.put("password", sqlProperties.getPassword());for (int i &#61; 0; i T unwrap(Class iface) throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic boolean isWrapperFor(Class> iface) throws SQLException {return DataSource.class.equals(iface);}&#64;Overridepublic PrintWriter getLogWriter() throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic void setLogWriter(PrintWriter out) throws SQLException {}&#64;Overridepublic void setLoginTimeout(int seconds) throws SQLException {throw new RuntimeException("Unsupport operation.");}&#64;Overridepublic int getLoginTimeout() throws SQLException {return 0;}&#64;Overridepublic Logger getParentLogger() throws SQLFeatureNotSupportedException {return null;}static class ConnectionProxy implements InvocationHandler {private Object obj;private LinkedList pool;private String DEFAULT_CLOSE_METHOD &#61; "close";private ConnectionProxy(Object obj, LinkedList pool) {this.obj &#61; obj;this.pool &#61; pool;}public static Connection getProxy(Object o, LinkedList pool) {Object proxed &#61; Proxy.newProxyInstance(o.getClass().getClassLoader(), new Class[]{Connection.class},new ConnectionProxy(o, pool));return (Connection) proxed;}&#64;Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (method.getName().equals(DEFAULT_CLOSE_METHOD)) {synchronized (pool) {pool.add((Connection) proxy);pool.notify();}return null;} else {return method.invoke(obj, args);}}}
创建JdbcPoolConfiguration类&#xff0c;注册template bean
&#64;Slf4j
&#64;Configuration
&#64;Component
public class KylinJdbcPoolConfiguration implements BeanFactoryPostProcessor, EnvironmentAware {private ConfigurableEnvironment environment;&#64;Value("${kylin.decrypt}")private boolean decrypt &#61; false;private final static String prefixName &#61; "kylin";&#64;Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {KylinSqlProperties properties &#61; new KylinSqlProperties();properties.setUserName("xxxxx");properties.setPassword("xxxx");properties.setConnectionUrl("xxxx");properties.setDecrypt(decrypt);createDataSourceBean(beanFactory, properties);}public void createDataSourceBean(ConfigurableListableBeanFactory beanFactory,KylinSqlProperties sqlProperties) {DataSource baseDataSource &#61; new KylinDataSource(sqlProperties);register(beanFactory, new JdbcTemplate(baseDataSource), prefixName &#43; "JdbcTemplateFactory", prefixName);}private void register(ConfigurableListableBeanFactory beanFactory, Object bean, String name,String alias) {beanFactory.registerSingleton(name, bean);if (!beanFactory.containsSingleton(alias)) {beanFactory.registerAlias(name, alias);}}&#64;Overridepublic void setEnvironment(Environment environment) {this.environment &#61; (ConfigurableEnvironment) environment;}}
RowMapper实现
public class CommonBeanPropertyRowMapper implements RowMapper {protected final Log logger &#61; LogFactory.getLog(this.getClass());private Class mappedClass;private boolean checkFullyPopulated &#61; false;private boolean primitivesDefaultedForNullValue &#61; false;private ConversionService conversionService &#61; DefaultConversionService.getSharedInstance();private Map mappedFields;private Set mappedProperties;public CommonBeanPropertyRowMapper() {}public CommonBeanPropertyRowMapper(Class mappedClass) throws Exception {this.initialize(mappedClass);}public CommonBeanPropertyRowMapper(Class mappedClass, boolean checkFullyPopulated)throws Exception {this.initialize(mappedClass);this.checkFullyPopulated &#61; checkFullyPopulated;}public void setMappedClass(Class mappedClass) throws Exception {if (this.mappedClass &#61;&#61; null) {this.initialize(mappedClass);} else if (this.mappedClass !&#61; mappedClass) {throw new InvalidDataAccessApiUsageException("The mapped class can not be reassigned to map to " &#43; mappedClass&#43; " since it is already providing mapping for " &#43; this.mappedClass);}}public final Class getMappedClass() {return this.mappedClass;}public void setCheckFullyPopulated(boolean checkFullyPopulated) {this.checkFullyPopulated &#61; checkFullyPopulated;}public boolean isCheckFullyPopulated() {return this.checkFullyPopulated;}public void setPrimitivesDefaultedForNullValue(boolean primitivesDefaultedForNullValue) {this.primitivesDefaultedForNullValue &#61; primitivesDefaultedForNullValue;}public boolean isPrimitivesDefaultedForNullValue() {return this.primitivesDefaultedForNullValue;}public void setConversionService(ConversionService conversionService) {this.conversionService &#61; conversionService;}public ConversionService getConversionService() {return this.conversionService;}protected void initialize(Class mappedClass) throws Exception {this.mappedClass &#61; mappedClass;this.mappedFields &#61; new HashMap();this.mappedProperties &#61; new HashSet();PropertyDescriptor[] pds &#61; BeanUtils.getPropertyDescriptors(mappedClass);PropertyDescriptor[] var3 &#61; pds;int var4 &#61; pds.length;for (int var5 &#61; 0; var5 org.springframework.jdbc.core.BeanPropertyRowMapper newInstance(Class mappedClass) {return new org.springframework.jdbc.core.BeanPropertyRowMapper(mappedClass);}
}
RowMapper子类
public class RowMapper extends CommonBeanPropertyRowMapper {private List mapperPlugins;private RowMapper(Class tClass, List mapperPlugins) throws Exception {super(tClass);this.mapperPlugins &#61; mapperPlugins;}&#64;Overrideprotected Object getColumnValue(ResultSet rs, int index, PropertyDescriptor pd)throws SQLException {Object object &#61; rs.getObject(index);return mapperPlugins.stream().filter(mapperPlugin -> mapperPlugin.test(pd)).map(mapperPlugin -> mapperPlugin.getColumnValue(object, pd)).findFirst().orElse(super.getColumnValue(rs, index, pd));}public static RowMapper getDefault(Class tClass) {return RowMapper.builder().tClass(tClass).mapperPlugins(JSONObjectPlugin).mapperPlugins(ListPlugin).mapperPlugins(SetPlugin).mapperPlugins(MapPlugin).mapperPlugins(EnumPlugin).mapperPlugins(JsonPlugin).build();}public static RowMapper withDefault(Class tClass, MapperPlugin... mapperPlugins) {RhllorRowMapperBuilder builder &#61; RowMapper.builder().tClass(tClass);for (final MapperPlugin mapperPlugin : mapperPlugins) {builder.mapperPlugins(mapperPlugin);}return builder.mapperPlugins(JSONObjectPlugin).mapperPlugins(ListPlugin).mapperPlugins(SetPlugin).mapperPlugins(MapPlugin).mapperPlugins(EnumPlugin).mapperPlugins(JsonPlugin).build();}public static RowMapper.RhllorRowMapperBuilder builder() {return new RowMapper.RhllorRowMapperBuilder<>();}public static class RhllorRowMapperBuilder {private Class tClass;private ArrayList mapperPlugins;RhllorRowMapperBuilder() {}public RowMapper.RhllorRowMapperBuilder tClass(Class tClass) {this.tClass &#61; tClass;return this;}public RowMapper.RhllorRowMapperBuilder mapperPlugins(MapperPlugin mapperPlugin) {if (this.mapperPlugins &#61;&#61; null) {this.mapperPlugins &#61; new ArrayList();}this.mapperPlugins.add(mapperPlugin);return this;}public RowMapper build() {List mapperPlugins;switch (this.mapperPlugins &#61;&#61; null ? 0 : this.mapperPlugins.size()) {case 0:mapperPlugins &#61; Collections.emptyList();break;case 1:mapperPlugins &#61; Collections.singletonList(this.mapperPlugins.get(0));break;default:mapperPlugins &#61; Collections.unmodifiableList(new ArrayList<>(this.mapperPlugins));}try {return new RowMapper<>(this.tClass, mapperPlugins);} catch (Exception ex) {ex.printStackTrace();}return null;}&#64;Overridepublic String toString() {return "PrestoRowMapper.KylinRowMapperBuilder(tClass&#61;" &#43; this.tClass &#43; ", mapperPlugins&#61;"&#43; this.mapperPlugins &#43; ")";}}}
MapperPlugin
public class MapperPlugin {private static final Function bytes2UTF8String &#61;bytes -> bytes instanceof String ? bytes.toString() :new String((byte[]) bytes, Charset.forName("UTF-8"));private static final Function pd2Generic &#61;pd -> getCollectionGeneric(pd.getReadMethod());private final Predicate predicate;private final ColumnValue columnValue;private MapperPlugin(Predicate predicate,ColumnValue columnValue) {this.predicate &#61; predicate;this.columnValue &#61; columnValue;}boolean test(PropertyDescriptor pd) {return predicate.test(pd);}Object getColumnValue(Object object, PropertyDescriptor pd) {return columnValue.get(object, pd);}public static MapperPluginsBuilder of(Predicate predicate) {return new MapperPluginsBuilder(predicate);}public static MapperPluginsBuilder ofNot(Predicate predicate) {return of(predicate.negate());}public static MapperPluginsBuilder of(Class clazz) {return of(pd -> clazz.isAssignableFrom(pd.getPropertyType()));}&#64;FunctionalInterfacepublic interface ColumnValue {Object get(Object object, PropertyDescriptor pd);}public static class MapperPluginsBuilder {Predicate predicate;public MapperPluginsBuilder(Predicate predicate) {this.predicate &#61; predicate;}public MapperPlugin columnValue(ColumnValue columnValue) {return new MapperPlugin(predicate, columnValue);}}static final MapperPlugin JsonPlugin &#61;MapperPlugin.ofNot(pd -> pd.getPropertyType().isPrimitive() ||Primitives.isWrapperType(pd.getPropertyType()) ||String.class.isAssignableFrom(pd.getPropertyType()) ||Date.class.isAssignableFrom(pd.getPropertyType())).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseObject(json, pd.getPropertyType())).orElse(null));static final MapperPlugin JSONObjectPlugin &#61;MapperPlugin.of(JSONObject.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(JSONObject::parseObject).orElse(new JSONObject()));static final MapperPlugin ListPlugin &#61;MapperPlugin.of(List.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseArray(json, pd2Generic.apply(pd))).orElse(new ArrayList<>()));static final MapperPlugin SetPlugin &#61;MapperPlugin.of(Set.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSON.parseArray(json, pd2Generic.apply(pd))).map(list -> Sets.newHashSet(List.class.cast(list))).orElse(new HashSet<>()));static final MapperPlugin MapPlugin &#61;MapperPlugin.of(Map.class).columnValue((object, pd) ->Optional.ofNullable(object).map(bytes2UTF8String).map(json -> JSONObject.parseObject(json, Map.class)).orElse(new HashMap<>()));static final MapperPlugin EnumPlugin &#61;MapperPlugin.of(Enum.class).columnValue((o, pd) -> {try {if (o &#61;&#61; null) {return null;}if (o instanceof Number) {Number number &#61; (Number) o;Method method &#61; pd.getPropertyType().getMethod("valueByIndex", Integer.TYPE);return method.invoke(null, number.intValue());} else {String val &#61; o.toString();Method method &#61; pd.getPropertyType().getMethod("fromString", String.class);return method.invoke(null, val);}} catch (NoSuchMethodException e) {throw new RuntimeException("getColumnValue error, NoSuchMethod : valueByIndex or fromString", e);} catch (InvocationTargetException e) {throw new RuntimeException("getColumnValue error, InvocationTargetException ", e);} catch (IllegalAccessException e) {throw new RuntimeException("getColumnValue error, IllegalAccessException ", e);}});private static Class> getCollectionGeneric(Method method) {if (Collection.class.isAssignableFrom(method.getReturnType())) {Type fc &#61; method.getGenericReturnType();if (fc &#61;&#61; null) {return Object.class;}if (fc instanceof ParameterizedType) {ParameterizedType pt &#61; (ParameterizedType) fc;return (Class) pt.getActualTypeArguments()[0];}return Object.class;}return Object.class;}}
具体使用
&#64;Component
&#64;Log4j2
public class MetricDaoImpl {&#64;Resource(name &#61; "kylinJdbcTemplateFactory")private JdbcTemplate kylinJdbcTemplate;public List getDistinctIds() {StringBuilder sqlBuilder &#61; new StringBuilder().append(" select * ").append(" from LOG_DID_VIEW ").append(" ORDER BY DT ,total DESC limit 1000");log.info(sqlBuilder);return kylinJdbcTemplate.query(sqlBuilder.toString(), RowMapper.getDefault(TotalModelMetricEntity.class));}
综上我们就完成了对Kylin JDBC的封装&#xff0c;同样的如Presto等其他支持JDBC的查询引擎封装方式类似。
欢迎关注 高广超的简书博客 与 收藏文章 &#xff01;
欢迎关注 头条号&#xff1a;互联网技术栈 &#xff01;
个人介绍&#xff1a;
高广超&#xff1a;多年一线互联网研发与架构设计经验&#xff0c;擅长设计与落地高可用、高性能、可扩展的互联网架构。目前从事大数据相关研发与架构工作。
本文首发在 高广超的简书博客 转载请注明&#xff01;