热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink集成MySQL数据源_Flink配置Kafka数据源

简介正文1.FlinkKafkaConsumer010flink中已经预置了kafka相关的数据源实现FlinkKafkaConsumer010,先看下具体的实现&#

简介

正文

1. FlinkKafkaConsumer010

flink中已经预置了kafka相关的数据源实现FlinkKafkaConsumer010,先看下具体的实现:

@PublicEvolving

public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 {

private static final long serialVersionUID = 2324564345203409112L;

public FlinkKafkaConsumer010(String topic, DeserializationSchema valueDeserializer, Properties props) {

this(Collections.singletonList(topic), valueDeserializer, props);

}

public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema deserializer, Properties props) {

this(Collections.singletonList(topic), deserializer, props);

}

public FlinkKafkaConsumer010(List topics, DeserializationSchema deserializer, Properties props) {

this((List)topics, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(deserializer)), props);

}

public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema deserializer, Properties props) {

super(topics, deserializer, props);

}

@PublicEvolving

public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) {

this((Pattern)subscriptionPattern, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(valueDeserializer)), props);

}

@PublicEvolving

public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) {

super(subscriptionPattern, deserializer, props);

}

......

}

kafka的Consumer有一堆实现,不过最终都是继承自FlinkKafkaConsumerBase,而这个抽象类则是继承RichParallelSourceFunction,是不是很眼熟,跟自定义mysql数据源继承的抽象类RichSourceFunction很类似。

public abstract class FlinkKafkaConsumerBase

extends RichParallelSourceFunction

implements CheckpointListener, ResultTypeQueryable, CheckpointedFunction

可以看到,这里有很多构造函数,我们直接使用即可。

1.1 代码使用

package myflink.job;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

import java.util.Properties;

/**

* kafka作为数据源,消费kafka中的消息

* 教程详见

* @See http://www.54tianzhisheng.cn/tags/Flink/

*/

public class KafkaDatasouceForFlinkJob {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();

properties.put("bootstrap.servers","localhost:9092");

properties.put("zookeeper.connect","localhost:2181");

properties.put("group.id","metric-group");

properties.put("auto.offset.reset","latest");

properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

DataStreamSource dataStreamSource = env.addSource(

new FlinkKafkaConsumer010(

"testjin" ,// topic

new SimpleStringSchema(),

properties

)

).setParallelism(1);

// dataStreamSource.print();

// 同样效果

dataStreamSource.addSink(new PrintSinkFunction<>());

env.execute("Flink add kafka data source");

}

}

说明&#xff1a;

a、这里直接使用properties对象来设置kafka相关配置&#xff0c;比如brokers、zk、groupId、序列化、反序列化等。

b、使用FlinkKafkaConsumer010构造函数&#xff0c;指定topic、properties配置

c、SimpleStringSchema仅针对String类型数据的序列化及反序列化&#xff0c;如果kafka中消息的内容不是String&#xff0c;则会报错&#xff1b;看下SimpleStringSchema的定义&#xff1a;

public class SimpleStringSchema implements DeserializationSchema, SerializationSchema

d、这里直接把获取到的消息打印出来。



推荐阅读
  • 本文详细介绍了如何使用MySQL来显示SQL语句的执行时间,并通过MySQL Query Profiler获取CPU和内存使用量以及系统锁和表锁的时间。同时介绍了效能分析的三种方法:瓶颈分析、工作负载分析和基于比率的分析。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • 本文由编程笔记#小编为大家整理,主要介绍了logistic回归(线性和非线性)相关的知识,包括线性logistic回归的代码和数据集的分布情况。希望对你有一定的参考价值。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文讨论了在Windows 8上安装gvim中插件时出现的错误加载问题。作者将EasyMotion插件放在了正确的位置,但加载时却出现了错误。作者提供了下载链接和之前放置插件的位置,并列出了出现的错误信息。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 不同优化算法的比较分析及实验验证
    本文介绍了神经网络优化中常用的优化方法,包括学习率调整和梯度估计修正,并通过实验验证了不同优化算法的效果。实验结果表明,Adam算法在综合考虑学习率调整和梯度估计修正方面表现较好。该研究对于优化神经网络的训练过程具有指导意义。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • Python SQLAlchemy库的使用方法详解
    本文详细介绍了Python中使用SQLAlchemy库的方法。首先对SQLAlchemy进行了简介,包括其定义、适用的数据库类型等。然后讨论了SQLAlchemy提供的两种主要使用模式,即SQL表达式语言和ORM。针对不同的需求,给出了选择哪种模式的建议。最后,介绍了连接数据库的方法,包括创建SQLAlchemy引擎和执行SQL语句的接口。 ... [详细]
  • 本文讨论了一个数列求和问题,该数列按照一定规律生成。通过观察数列的规律,我们可以得出求解该问题的算法。具体算法为计算前n项i*f[i]的和,其中f[i]表示数列中有i个数字。根据参考的思路,我们可以将算法的时间复杂度控制在O(n),即计算到5e5即可满足1e9的要求。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • 本文讨论了编写可保护的代码的重要性,包括提高代码的可读性、可调试性和直观性。同时介绍了优化代码的方法,如代码格式化、解释函数和提炼函数等。还提到了一些常见的坏代码味道,如不规范的命名、重复代码、过长的函数和参数列表等。最后,介绍了如何处理数据泥团和进行函数重构,以提高代码质量和可维护性。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
author-avatar
059586768803wsq
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有