热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FlinkSink_将结果输出到Kafka_Redis_ES_Mysql中

Sink将计算好结果输出到外部系统,调用addSink()传入指定的SinkFunction()将结果输出到Kafka中将结果输出到Redis中将结果输出到ES中将结果输出到M

Sink

将计算好结果输出到外部系统, 调用 addSink()传入指定的SinkFunction()


  1. 将结果输出到 Kafka 中
  2. 将结果输出到 Redis 中
  3. 将结果输出到 ES 中
  4. 将结果输出到 Mysql 中: 事先创建好表结构

pom.xml 事先导入对应的 connector:

org.apache.flinkflink-java1.10.1org.apache.flinkflink-streaming-java_2.121.10.1org.apache.flinkflink-connector-kafka-0.11_2.121.10.1org.apache.flinkflink-connector-rabbitmq_2.121.10.1org.apache.bahirflink-connector-redis_2.111.0org.apache.flinkflink-connector-elasticsearch6_2.121.10.1mysqlmysql-connector-java5.1.44

实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;/*** @author regotto*/
public class SinkTest {private static void saveToRedis(DataStream<SensorReading> dataStream) {FlinkJedisPoolConfig.Builder builder &#61; new FlinkJedisPoolConfig.Builder();builder.setHost("localhost");// 顶级接口 SinkFunction, 核心方法 invokedataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() {/*** 将温度数据保存为 id-temperature hash 形式到 redis* &#64;return*/&#64;Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "sensor");}&#64;Overridepublic String getKeyFromData(SensorReading sensorReading) {return sensorReading.getId();}&#64;Overridepublic String getValueFromData(SensorReading sensorReading) {return sensorReading.getTemperature().toString();}}));}private static void saveToKafka(DataStream<SensorReading> dataStream) {// 将数据输出到 Kafka 中dataStream.map((MapFunction<SensorReading, String>) value -> value.toString()).addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema()));}private static void saveToEs(DataStream<SensorReading> dataStream) {// 将数据输出到 ElasticSearchArrayList<HttpHost> httpHosts &#61; new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200));//真正的 SinkFunction 是 ElasticsearchSink(使用构建者构建), ElasticsearchSinkFunction 只是负责处理以哪种方式存入dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> {HashMap<String, String> source &#61; new HashMap<>();source.put("id", sensorReading.getId());source.put("temp", sensorReading.getTemperature().toString());source.put("time", sensorReading.getTimestamp().toString());IndexRequest indexRequest &#61; Requests.indexRequest().index("sensor").type("readingData").source(source);requestIndexer.add(indexRequest);}).build());}private static void saveToMysql(DataStream<SensorReading> dataStream) {/*由于性能问题, 官方未提供 mysqlSink, 将数据存入 mysql, 自定义 sinkjdbc 要连接处理, 使用 RichSinkFunction, 利用 open, close 方法*/dataStream.addSink(new RichSinkFunction<SensorReading>() {Connection connection &#61; null;PreparedStatement insertStatement &#61; null;&#64;Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.jdbc.Driver");connection &#61; DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");insertStatement &#61; connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)");}&#64;Overridepublic void invoke(SensorReading value, Context context) throws Exception {insertStatement.setString(1, value.getId());insertStatement.setLong(2, value.getTimestamp());insertStatement.setDouble(3, value.getTemperature());insertStatement.execute();}&#64;Overridepublic void close() throws Exception {insertStatement.close();connection.close();}});}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> input &#61; env.readTextFile("sensor.txt");DataStream<SensorReading> dataStream &#61; input.map((MapFunction<String, SensorReading>) value -> {String[] split &#61; value.split(",");return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));});saveToMysql(dataStream);env.execute();}
}

总结

进行数据存储时, 指定对应 SinkFunction 即可.


推荐阅读
author-avatar
66顺主管386711
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有