<dependencies>
<dependency>
<groupId>org.apache.hudigroupId>
<artifactId>hudi-flink1.14-bundleartifactId>
<version>${hudi.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>2.6.0version>
<scope>providedscope>
<exclusions>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-web_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-runtime_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-simpleartifactId>
<version>1.7.25version>
<scope>compilescope>
dependency>
dependencies>
View Code
消费kafka,通过flink datastream持续写入hudi
package com.liangji.hudi0121.flink114; View Code kafka消息发送示例如下: package com.liangji.kafka; View Code 通过flink datastream持续读取hudi package com.liangji.hudi0121.flink114; View Code <dependencies> View Code 通过flink sql创建表,并将hudi表元数据托管在hive metastore中 package com.liangji.hudi0121.flink114.sql.ddl View Code 通过flink sql写入hudi表,如下是通过batch方式写入 package com.liangji.hudi0121.flink114.sql.write View Code 通过flink sql查询hudi表,如下是通过batch方式查询 package com.liangji.hudi0121.flink114.sql.query View Code 通过flink sql消费hudi表,流式写入hudi表,如下是通过streaming方式写入 package com.liangji.hudi0121.flink114.sql.streaming.write View Code 注: 通过flink sql流式读取hudi表,如下是通过streaming方式读取 package com.liangji.hudi0121.flink114.sql.streaming.query View Code 注:需在sql添加hint:'read.streaming.enabled'='true' TRANSLATE with x English TRANSLATE with COPY THE URL BELOW EMBED THE SNIPPET BELOW IN YOUR SITE
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class JavaTestInsert {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String targetTable = "t1";
String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1";
String checkpointDir= "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk";
ParameterTool parameterTool = ParameterTool.fromArgs(args);
//设置并行度
String sourceTopic = parameterTool.get("sourceTopic","odeon_test_liangji_114_test");
String cg = parameterTool.get("cg","CG_odeon_test_liangji_114_test");
String cgpwd = parameterTool.get("cgPwd","252287");
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
env.setStateBackend(backend);
env.enableCheckpointing(60000);
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointStorage(checkpointDir);
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
config.setCheckpointTimeout(60000);
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.31.162.72:9093,172.31.162.73:9093,172.31.162.74:9093");
prop.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
prop.setProperty(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,cg);
prop.setProperty(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\""+cg+"\"\n" +"password=\""+cgpwd+"\";");
prop.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
prop.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
prop.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
RowType rowType = createJsonRowType();
KafkaSource
.setBootstrapServers("bootstrap server")
.setTopics(sourceTopic)
.setGroupId(cg)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new JsonRowDataDeserializationSchema(rowType, InternalTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601))
.setProperties(prop)
.build();
DataStream
Map
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(200)")
.column("name VARCHAR(100)")
.column("age INT")
.column("ts TIMESTAMP(0)")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
env.execute("Api_Sink");
}
private static RowType createJsonRowType() {
return (RowType) DataTypes.ROW(
DataTypes.FIELD("uuid", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(0)),
DataTypes.FIELD("partition", DataTypes.STRING())
).getLogicalType();
}
}
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.SaslConfigs;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @author liangji
* @Description TODO
* @Date 2021/5/7 19:17
*/
public class KafkaProducerFrom1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrap server");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM,"SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG,"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"PG_odeon_test_liangji_114_test\" password=\"889448\";");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
while (true) {
String uuid = UUID.randomUUID().toString().replace("-", "").toLowerCase();
ProducerRecord producerRecord = new ProducerRecord<>("odeon_test_liangji_114_test","k", "{" +
"\"uuid\":" + "\"" + uuid + "\"," +
"\"name\":\"liangji-" + uuid + "\"," +
"\"age\":\"18\"" + "," +
"\"ts\":\"2022-11-17T14:00:00\"" + "," +
"\"partition\":\"2022-11-17\"" +
"}");
Future
future.get();
System.out.println("success");
Thread.sleep(1000);
}
}
}query
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
public class JavaTestQuery {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String targetTable = "t1";
String basePath = "/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/t1";
String startCommit = "20221117164814122";
if (StringUtils.isEmpty(startCommit)) {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(basePath).build();
startCommit = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants().firstInstant()
.map(HoodieInstant::getTimestamp).orElse(null);
}
Map
options.put(FlinkOptions.PATH.key(), basePath);
options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
options.put(FlinkOptions.READ_AS_STREAMING.key(), "true");
options.put(FlinkOptions.READ_START_COMMIT.key(), startCommit);
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("uuid VARCHAR(200)")
.column("name VARCHAR(100)")
.column("age INT")
.column("ts TIMESTAMP(0)")
.column("`partition` VARCHAR(20)")
.pk("uuid")
.partition("partition")
.options(options);
DataStream
rowDataDataStream.print();
env.execute("Api_Source");
}
}hudi with flink sql & hudi on metastore by scala
pom依赖
<dependency>
<groupId>org.apache.hudigroupId>
<artifactId>hudi-flink1.14-bundleartifactId>
<version>${hudi.version}version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>2.6.0version>
<scope>providedscope>
<exclusions>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
exclusion>
<exclusion>
<groupId>org.apache.commonsgroupId>
<artifactId>commons-math3artifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-web_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-scala-bridge_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_2.11artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-simpleartifactId>
<version>1.7.25version>
<scope>compilescope>
dependency>
dependencies>ddl
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import java.net.URL
object CreateTable {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hueuser")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val createCatalogSql =
s"""
|CREATE CATALOG hudi_catalog WITH (
| 'type' = 'hudi',
| 'mode' = 'hms',
| 'default-database' = 'default',
| 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
|)
|""".stripMargin
val createDbSql = """create database if not exists hudi_catalog.ccr_ai_upgrade""".stripMargin
val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin
val createTbSql =
"""
|CREATE TABLE if not exists test_hudi_flink_mor (
| uuid VARCHAR(200) PRIMARY KEY NOT ENFORCED,
| name VARCHAR(100),
| age INT,
| ts TIMESTAMP(0),
| `partition` VARCHAR(20)
|)
|PARTITIONED BY (`partition`
|)
|WITH (
| 'connector' = 'hudi',
| 'path' = '/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/demo/test_hudi_flink_mor',
| 'table.type' = 'MERGE_ON_READ',
| 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
| 'hoodie.datasource.write.recordkey.field' = 'uuid',
| 'hoodie.datasource.write.hive_style_partitioning' = 'true'
|)
|""".stripMargin
tableEnv.executeSql(createCatalogSql)
tableEnv.executeSql(createDbSql)
tableEnv.executeSql(changeDbSql)
tableEnv.executeSql(createTbSql).print()
}
}batch
write
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object BatchWrite {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hueuser")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val createCatalogSql =
s"""
|CREATE CATALOG hudi_catalog WITH (
| 'type' = 'hudi',
| 'mode' = 'hms',
| 'default-database' = 'default',
| 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
|)
|""".stripMargin
val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin
tableEnv.executeSql(createCatalogSql)
tableEnv.executeSql(changeDbSql)
tableEnv.executeSql("insert into test_hudi_flink_mor values ('b','liangji-1',19,TIMESTAMP '2022-11-18 18:00:00','2022-11-18')").print()
}
}query
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object BatchQuery {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hueuser")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val createCatalogSql =
s"""
|CREATE CATALOG hudi_catalog WITH (
| 'type' = 'hudi',
| 'mode' = 'hms',
| 'default-database' = 'default',
| 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
|)
|""".stripMargin
val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin
tableEnv.executeSql(createCatalogSql)
tableEnv.executeSql(changeDbSql)
tableEnv.executeSql("select * from test_hudi_flink_mor").print()
}
}streaming
write
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object StreamingWrite {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hueuser")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val checkpointDir = "hdfs://ns-tmp/project/ccr_ai_upgrade/ccr_ai_upgrade/aiui/flink/chk"
val backend = new EmbeddedRocksDBStateBackend(true)
env.setStateBackend(backend)
env.enableCheckpointing(60000)
val config = env.getCheckpointConfig
config.setCheckpointStorage(checkpointDir)
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
config.setCheckpointTimeout(60000)
val createCatalogSql =
s"""
|CREATE CATALOG hudi_catalog WITH (
| 'type' = 'hudi',
| 'mode' = 'hms',
| 'default-database' = 'default',
| 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
|)
|""".stripMargin
val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin
tableEnv.executeSql(createCatalogSql)
tableEnv.executeSql(changeDbSql)
val table = tableEnv.sqlQuery("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/")
// table=>datastream
val stream = tableEnv.toDataStream(table)
// do anything u want
// datastream=>table
val inputTable = tableEnv.fromDataStream(stream)
// create temp view
tableEnv.createTemporaryView("InputTable", inputTable)
tableEnv.executeSql("insert into test_hudi_flink_mor_streaming select * from InputTable")
}
}query
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object StreamingQuery {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","hueuser")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val createCatalogSql =
s"""
|CREATE CATALOG hudi_catalog WITH (
| 'type' = 'hudi',
| 'mode' = 'hms',
| 'default-database' = 'default',
| 'hive.conf.dir' = 'E:\\mine\\hudi0121-demo-maven\\flink114-scala\\src\\main\\resources'
|)
|""".stripMargin
val changeDbSql = """use hudi_catalog.ccr_ai_upgrade""".stripMargin
tableEnv.executeSql(createCatalogSql)
tableEnv.executeSql(changeDbSql)
tableEnv.executeSql("select * from test_hudi_flink_mor/*+ OPTIONS('read.streaming.enabled'='true')*/").print()
}
}参考
Arabic Hebrew Polish Bulgarian Hindi Portuguese Catalan Hmong Daw Romanian Chinese Simplified Hungarian Russian Chinese Traditional Indonesian Slovak Czech Italian Slovenian Danish Japanese Spanish Dutch Klingon Swedish English Korean Thai Estonian Latvian Turkish Finnish Lithuanian Ukrainian French Malay Urdu German Maltese Vietnamese Greek Norwegian Welsh Haitian Creole Persian
Back
Enable collaborative features and customize widget: Bing Webmaster Portal
Back
此页面的语言为中文(简体)
翻译为