通过calcite,使用sql来读取csv数据
本文争取利用最少的代码来实现此功能,理解calcite是如何工作的
SqlParser parser = SqlParser.create("select * from DEPTS where DEPTNO>=20 ", SqlParser.Config.DEFAULT);// 解析SQL字符串, 生成SqlNode树SqlNode sqlNode = parser.parseStmt();System.out.println(sqlNode);
结果 SqlNode
SELECT *
FROM DEPTS
WHERE DEPTNO
>= 20
CsvSchema csvSchema = new CsvSchema();CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);rootSchema.add("test_schema", csvSchema);SchemaPlus schemaPlus = Frameworks.createRootSchema(false);schemaPlus.add("test_schema", csvSchema);CalciteConnectionConfigImpl config = new CalciteConnectionConfigImpl(new Properties());RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();Prepare.CatalogReader catalogReader = new CalciteCatalogReader(rootSchema, Collections.singletonList("test_schema"), typeFactory, config);SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT.withLenientOperatorLookup(config.lenientOperatorLookup()).withSqlConformance(config.conformance()).withDefaultNullCollation(config.defaultNullCollation()).withIdentifierExpansion(true);SqlValidator validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);// 执行SQL验证SqlNode validateSqlNode = validator.validate(sqlNode);System.out.println(sqlNode);
输出 SqlNode
SELECT DEPTS
.DEPTNO
, DEPTS
.NAME
FROM test_schema
.DEPTS
AS DEPTS
WHERE DEPTS
.DEPTNO
>= 20
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));planner.addRelTraitDef(ConventionTraitDef.INSTANCE);// 创建SqlToRelConverterRelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));SqlToRelConverter.Config converterConfig = SqlToRelConverter.config().withTrimUnusedFields(true).withExpand(false);SqlToRelConverter converter = new SqlToRelConverter(null, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, converterConfig);// 将SqlNode树转化为RelNode树RelRoot relNode = converter.convertQuery(validateSqlNode, false, true);System.out.println(relNode);
输出
Root {kind: SELECT, rel: rel#3:LogicalProject.NONE(input&#61;LogicalFilter#2,inputs&#61;0…1), rowType: RecordType(INTEGER DEPTNO, VARCHAR NAME), fields: [<0, DEPTNO>, <1, NAME>], collation: []}
// 规则RuleSet rules &#61; RuleSets.ofList(CoreRules.FILTER_TO_CALC, CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE, CoreRules.FILTER_INTO_JOIN, // 过滤谓词下推到Join之前EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE, EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE, EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE, EnumerableRules.ENUMERABLE_SORT_RULE, EnumerableRules.ENUMERABLE_CALC_RULE, EnumerableRules.ENUMERABLE_AGGREGATE_RULE);Program program &#61; Programs.of(RuleSets.ofList(rules));RelNode optimizerRelTree &#61; program.run(planner, relNode.rel, relNode.rel.getTraitSet().plus(EnumerableConvention.INSTANCE), Collections.emptyList(), Collections.emptyList());System.out.println(optimizerRelTree);
// 生成物理执行计划EnumerableRel enumerable &#61; (EnumerableRel) optimizerRelTree;Map<String, Object> internalParameters &#61; new LinkedHashMap<>();EnumerableRel.Prefer prefer &#61; EnumerableRel.Prefer.ARRAY;Bindable bindable &#61; EnumerableInterpretable.toBindable(internalParameters, null, enumerable, prefer);final Properties properties &#61; new Properties();properties.put("caseSensitive", "true");Connection connection &#61; DriverManager.getConnection("jdbc:calcite:", properties);CalciteConnection calciteConnection &#61; connection.unwrap(CalciteConnection.class);DataContext dataContext &#61; DataContexts.of(calciteConnection, schemaPlus);Enumerable bind &#61; bindable.bind(dataContext);Enumerator enumerator &#61; bind.enumerator();while (enumerator.moveNext()) {Object current &#61; enumerator.current();Object[] values &#61; (Object[]) current;StringBuilder sb &#61; new StringBuilder();for (Object v : values) {sb.append(v).append(",");}sb.setLength(sb.length() - 1);System.out.println(sb);}
public class CsvSchema extends AbstractSchema {&#64;Overrideprotected Map<String, Table> getTableMap() {return createTableMap();}private Map<String, Table> createTableMap() {HashMap<String, Table> hashMap &#61; new HashMap<>();File file &#61; new File(this.getClass().getClassLoader().getResource("sales/DEPTS.csv").getPath());Source sourceDepts &#61; Sources.of(file);hashMap.put("DEPTS", new CsvScannableTable(sourceDepts, null));return hashMap;}
}
public class CsvScannableTable extends CsvTableimplements ScannableTable {CsvScannableTable(Source source, RelProtoDataType protoRowType) {super(source, protoRowType);}&#64;Override public String toString() {return "CsvScannableTable";}&#64;Override public Enumerable< Object[]> scan(DataContext root) {JavaTypeFactory typeFactory &#61; root.getTypeFactory();final List<RelDataType> fieldTypes &#61; getFieldTypes(typeFactory);final List<Integer> fields &#61; ImmutableIntList.identity(fieldTypes.size());return new AbstractEnumerable<&#64;Nullable Object[]>() {&#64;Overridepublic Enumerator<&#64;Nullable Object[]> enumerator() {try {return new DemoCsvEnumerator(source, fieldTypes, fields);} catch (IOException e) {throw new RuntimeException(e);}}};}
}
public class DemoCsvEnumerator<E> implements Enumerator<E> {private final CSVReader reader;private final DemoCsvEnumerator.RowConverter<E> rowConverter;private final &#64;Nullable List<&#64;Nullable String> filterValues;private &#64;Nullable E current;public DemoCsvEnumerator(Source source,List<RelDataType> fieldTypes, List<Integer> fields) throws IOException {//noinspection uncheckedthis(source, null,(DemoCsvEnumerator.RowConverter<E>) converter(fieldTypes, fields));}private static DemoCsvEnumerator.RowConverter<?> converter(List<RelDataType> fieldTypes,List<Integer> fields) {return new DemoCsvEnumerator.ArrayRowConverter(fieldTypes, fields);}public DemoCsvEnumerator(Source source,&#64;Nullable String &#64;Nullable [] filterValues, RowConverter<E> rowConverter) throws IOException {this.rowConverter &#61; rowConverter;this.filterValues &#61; filterValues &#61;&#61; null ? null: ImmutableNullableList.copyOf(filterValues);this.reader &#61; new CSVReader(source.reader());this.reader.readNext(); // skip header row}abstract static class RowConverter<E> {abstract E convertRow(&#64;Nullable String[] rows);&#64;SuppressWarnings("JavaUtilDate")protected &#64;Nullable Object convert(&#64;Nullable RelDataType fieldType, &#64;Nullable String string) {if (fieldType &#61;&#61; null || string &#61;&#61; null) {return string;}switch (fieldType.getSqlTypeName()) {case INTEGER:if (string.length() &#61;&#61; 0) {return null;}return Integer.parseInt(string);case VARCHAR:default:return string;}}}public static RelDataType deduceRowType(JavaTypeFactory typeFactory,Source source, &#64;Nullable List<RelDataType> fieldTypes, Boolean stream) {final List<RelDataType> types &#61; new ArrayList<>();final List<String> names &#61; new ArrayList<>();if (fieldTypes !&#61; null) {fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));}types.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));types.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));names.add("DEPTNO");names.add("NAME");return typeFactory.createStructType(Pair.zip(names, types));}private static RelDataType toNullableRelDataType(JavaTypeFactory typeFactory,SqlTypeName sqlTypeName) {return typeFactory.createTypeWithNullability(typeFactory.createSqlType(sqlTypeName), true);}&#64;Overridepublic E current() {return castNonNull(current);}&#64;Overridepublic boolean moveNext() {try {final String[] strings &#61; reader.readNext();if (strings &#61;&#61; null) {current &#61; null;reader.close();return false;}current &#61; rowConverter.convertRow(strings);return true;} catch (IOException e) {throw new RuntimeException(e);}}static class ArrayRowConverter extends RowConverter<&#64;Nullable Object[]> {/*** Field types. List must not be null, but any element may be null.*/private final List<RelDataType> fieldTypes;private final ImmutableIntList fields;ArrayRowConverter(List<RelDataType> fieldTypes, List<Integer> fields) {this.fieldTypes &#61; ImmutableNullableList.copyOf(fieldTypes);this.fields &#61; ImmutableIntList.copyOf(fields);}&#64;Overridepublic &#64;Nullable Object[] convertRow(&#64;Nullable String[] strings) {return convertNormalRow(strings);}public &#64;Nullable Object[] convertNormalRow(&#64;Nullable String[] strings) {final &#64;Nullable Object[] objects &#61; new Object[fields.size()];for (int i &#61; 0; i < fields.size(); i&#43;&#43;) {int field &#61; fields.get(i);objects[i] &#61; convert(fieldTypes.get(field), strings[field]);}return objects;}}&#64;Overridepublic void reset() {throw new UnsupportedOperationException();}&#64;Overridepublic void close() {try {reader.close();} catch (IOException e) {throw new RuntimeException("Error closing CSV reader", e);}}
}
<dependencies><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-core</artifactId><version>1.30.0</version></dependency><dependency><groupId>org.apache.calcite</groupId><artifactId>calcite-file</artifactId><version>1.30.0</version></dependency><dependency><groupId>net.sf.opencsv</groupId><artifactId>opencsv</artifactId><version>2.3</version></dependency></dependencies>
数据格式
DEPTNO:int,NAME:string
10,“Sales”
20,“Marketing”
30,“Accounts”
结果