个人感觉还不是很完善,很多都是针对python去做得案例,但是值得期待
1、pom.xml
xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>FlinkSqlartifactId>
<groupId>org.examplegroupId>
<version>1.0-SNAPSHOTversion>
parent>
<modelVersion>4.0.0modelVersion>
<artifactId>FlinkMLartifactId>
<properties>
<flink12.version>1.12.1flink12.version>
<scala.binary.version>2.11scala.binary.version>
<flink-cdc.version>1.2.0flink-cdc.version>
<hive.version>1.1.0hive.version>
<alink.version>1.4.0alink.version>
properties>
<dependencies>
<dependency>
<groupId>com.alibaba.alinkgroupId>
<artifactId>alink_core_flink-1.12_2.11artifactId>
<version>${alink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hive_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-hadoop-compatibility_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-execartifactId>
<version>1.1.0version>
dependency>
<dependency>
<groupId>org.apache.hivegroupId>
<artifactId>hive-metastoreartifactId>
<version>1.1.0version>
dependency>
<dependency>
<groupId>org.apache.thriftgroupId>
<artifactId>libfb303artifactId>
<version>0.9.2version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-shaded-hadoop-2-uberartifactId>
<version>2.6.5-7.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-coreartifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.44version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>RELEASEversion>
<scope>compilescope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
<version>3.2.0version>
dependency>
<dependency>
<groupId>org.apache.bahirgroupId>
<artifactId>flink-connector-redis_2.11artifactId>
<version>1.0version>
dependency>
<dependency>
<groupId>com.alibaba.ververicagroupId>
<artifactId>flink-connector-mysql-cdcartifactId>
<version>1.2.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-orc-nohive_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-hbase-1.4_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>com.google.guavagroupId>
<artifactId>guavaartifactId>
<version>19.0version>
dependency>
<dependency>
<groupId>ru.yandex.clickhousegroupId>
<artifactId>clickhouse-jdbcartifactId>
<version>0.2.4version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-elasticsearch6_2.11artifactId>
<version>${flink12.version}version>
dependency>
<dependency>
<groupId>com.alibaba.ververicagroupId>
<artifactId>flink-format-changelog-jsonartifactId>
<version>1.1.0version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>1.7.25version>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>1.7.25version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.6.0version>
<configuration>
<source>1.8source>
<target>1.8target>
<encoding>UTF-8encoding>
configuration>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
<version>3.1.6version>
<configuration>
<scalaCompatVersion>2.11scalaCompatVersion>
<scalaVersion>2.11.12scalaVersion>
<encoding>UTF-8encoding>
<addScalacArgs>-target:jvm-1.8addScalacArgs>
configuration>
<executions>
<execution>
<id>compile-scalaid>
<phase>compilephase>
<goals>
<goal>add-sourcegoal>
<goal>compilegoal>
goals>
execution>
<execution>
<id>test-compile-scalaid>
<phase>test-compilephase>
<goals>
<goal>add-sourcegoal>
<goal>testCompilegoal>
goals>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-assembly-pluginartifactId>
<version>2.6version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
<archive>
<manifest>
<mainClass>MySqlBinlogSourceExamplemainClass>
manifest>
archive>
configuration>
<executions>
<execution>
<id>make-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
project>
2、代码
package RegressionPredict;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.regression.GbdtRegPredictBatchOp;
import com.alibaba.alink.operator.batch.regression.GbdtRegTrainBatchOp;
import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.regression.GbdtRegPredictStreamOp;
import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
* @program: FlinkSql
* @description:
* @author: yang
* @create: 2021-06-23 16:24
*/
public class GbdtRegPredictBatchOpTest {
public static void main(String[] args) throws Exception {
List
Row.of(1.0, "A", 0, 0, 0),
Row.of(2.0, "B", 1, 1, 0),
Row.of(3.0, "C", 2, 2, 1),
Row.of(4.0, "D", 3, 3, 1)
);
//方式一:如果不指定数据类型,则程序根据值去猜数据类型,
// BatchOperator > batchSource = new MemSourceBatchOp(df, new String[] {"f0", "f1", "f2", "f3", "label"});
// StreamOperator > streamSource = new MemSourceStreamOp(df, new String[] {"f0", "f1", "f2", "f3", "label"});
//方式二
BatchOperator > batchSource = new MemSourceBatchOp(df, new TableSchema("f0,f1,f2,f3,label".split(","),
new TypeInformation[] {Types.DOUBLE, Types.STRING,Types.INT,Types.INT,Types.INT}));
StreamOperator > streamSource = new MemSourceStreamOp(df,new TableSchema("f0,f1,f2,f3,label".split(","),
new TypeInformation[] {Types.DOUBLE, Types.STRING,Types.INT,Types.INT,Types.INT}));
BatchOperator > trainOp = new GbdtRegTrainBatchOp()
.setLearningRate(1.0)
.setNumTrees(3)
.setMinSamplesPerLeaf(1)
.setLabelCol("label")
.setFeatureCols("f0", "f1", "f2", "f3")
.linkFrom(batchSource);
BatchOperator > predictBatchOp = new GbdtRegPredictBatchOp()
.setPredictionCol("pred");
StreamOperator > predictStreamOp = new GbdtRegPredictStreamOp(trainOp)
.setPredictionCol("pred");
System.out.println(">>>>>>>>>批量>>>>>>>>>>");
predictBatchOp.linkFrom(trainOp, batchSource).print();
System.out.println(">>>>>>>>>流式>>>>>>>>>>");
predictStreamOp.linkFrom(streamSource).print();
StreamOperator.execute();
}
}