热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Flink流处理的动态实时(二)

在基于Flink流处理的动态实时(一)的基础上我写了centos7中安装zookpeer和kafka(单机)的原因是因为不确定公司到底是想用kafka还是rockermq,所以在还

在基于Flink流处理的动态实时(一)的基础上 我写了centos7中安装zookpeer和kafka(单机)的原因是因为不确定公司到底是想用kafka还是rockermq,所以在还没有正式的开发的时候去玩玩kafka。之后决定用rockermq了,所以我接下来写的技术flink和rockermq的一个整合。

首先要了解一下rockermq,初学的话可以看一下初试RocketMQ消息中间件丶一个站在Java后端设计之路的男青年个人博客网站文章,并安装RocketMQ 插件方便省事。

依次运行mqnamesrv.cmd脚本和mqbroker.cmd脚本,因为会出现中文乱码的缘故,我们在rocketmq-console项目的配置文件中添加和配置端口号

server.port=9875
server.tomcat.uri-encoding=UTF-8

并且将namesrvAddr修改成自己的ip地址

rocketmq.config.namesrvAddr=xxx.xxx.xx.xx:9876

然后运行它(随便怎么搞,反正启动就行了。在idea中也行,大jar启动也行)

之后在你游览器输入http://localhost:9875/#/ 可以选择中文还是英文。

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》

在然后在你的项目中加入mq的配置

spring.rocketmq.nameServer=xx.xxx.xx.xxx:9876

在创建一个消费者的一个类,测试是否能发送成功

public class TestDome {
/**
* 消费者如需订阅某Topic下特定类型的消息!
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main(String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer cOnsumer= new DefaultMQPushConsumer("bk_group");//消费者群组
consumer.setNamesrvAddr ("xx.xxx.xx.xx:9876");//mq的ip地址和端口
consumer.setInstanceName(UUID.randomUUID().toString());
// 请明确标明Tag:只关注自己需要的!
consumer.subscribe("test", "*");
consumer. registerMessageListener (new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs ,
ConsumeConcurrentlyContext context) {
for (MessageExt messageExt:msgs) {
String msg = new String(messageExt.getBody()); //从mq中取得消息
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msg);
}
MessageExt msg = msgs.get (0);
/*** 对topic tag验证:只关注特定Pay*/
if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) {
System.out.print("特定类型:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 上面的消费者群组,你也可以换的。
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 这是我的主题,你也可以修改的。

修改之后启动他,发送信息

《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 启动你的main方法之后提交信息,先提交启动都可以,无所谓
《基于Flink流处理的动态实时(二)》
《基于Flink流处理的动态实时(二)》 成功消费,最上面一条是启动直接生产的。

成功搞定,等待下一次flink和rockermq的一个整合,写的不一定特别对,毕竟我也是第一次接触rockermq和flink,如果觉得我写的不对的地方,欢迎留言,互相学习学习。


推荐阅读
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • Linux服务器密码过期策略、登录次数限制、私钥登录等配置方法
    本文介绍了在Linux服务器上进行密码过期策略、登录次数限制、私钥登录等配置的方法。通过修改配置文件中的参数,可以设置密码的有效期、最小间隔时间、最小长度,并在密码过期前进行提示。同时还介绍了如何进行公钥登录和修改默认账户用户名的操作。详细步骤和注意事项可参考本文内容。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了在CentOS上安装Python2.7.2的详细步骤,包括下载、解压、编译和安装等操作。同时提供了一些注意事项,以及测试安装是否成功的方法。 ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
  • iOS超签签名服务器搭建及其优劣势
    本文介绍了搭建iOS超签签名服务器的原因和优势,包括不掉签、用户可以直接安装不需要信任、体验好等。同时也提到了超签的劣势,即一个证书只能安装100个,成本较高。文章还详细介绍了超签的实现原理,包括用户请求服务器安装mobileconfig文件、服务器调用苹果接口添加udid等步骤。最后,还提到了生成mobileconfig文件和导出AppleWorldwideDeveloperRelationsCertificationAuthority证书的方法。 ... [详细]
  • SpringMVC接收请求参数的方式总结
    本文总结了在SpringMVC开发中处理控制器参数的各种方式,包括处理使用@RequestParam注解的参数、MultipartFile类型参数和Simple类型参数的RequestParamMethodArgumentResolver,处理@RequestBody注解的参数的RequestResponseBodyMethodProcessor,以及PathVariableMapMethodArgumentResol等子类。 ... [详细]
  • centos安装Mysql的方法及步骤详解
    本文介绍了centos安装Mysql的两种方式:rpm方式和绿色方式安装,详细介绍了安装所需的软件包以及安装过程中的注意事项,包括检查是否安装成功的方法。通过本文,读者可以了解到在centos系统上如何正确安装Mysql。 ... [详细]
  • 本文详细解析了JavaScript中相称性推断的知识点,包括严厉相称和宽松相称的区别,以及范例转换的规则。针对不同类型的范例值,如差别范例值、统一类的原始范例值和统一类的复合范例值,都给出了具体的比较方法。对于宽松相称的情况,也解释了原始范例值和对象之间的比较规则。通过本文的学习,读者可以更好地理解JavaScript中相称性推断的概念和应用。 ... [详细]
  • Nginx使用(server参数配置)
    本文介绍了Nginx的使用,重点讲解了server参数配置,包括端口号、主机名、根目录等内容。同时,还介绍了Nginx的反向代理功能。 ... [详细]
  • 本文介绍了高校天文共享平台的开发过程中的思考和规划。该平台旨在为高校学生提供天象预报、科普知识、观测活动、图片分享等功能。文章分析了项目的技术栈选择、网站前端布局、业务流程、数据库结构等方面,并总结了项目存在的问题,如前后端未分离、代码混乱等。作者表示希望通过记录和规划,能够理清思路,进一步完善该平台。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • JS实现一键分享功能
    本文介绍了如何使用JS实现一键分享功能,并提供了2019独角兽企业招聘Python工程师的标准。同时,给出了分享到QQ空间、新浪微博和人人网的链接。 ... [详细]
author-avatar
Yyao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有