作者:Yyao | 来源:互联网 | 2023-08-25 02:15
在基于Flink流处理的动态实时(一)的基础上我写了centos7中安装zookpeer和kafka(单机)的原因是因为不确定公司到底是想用kafka还是rockermq,所以在还
在基于Flink流处理的动态实时(一)的基础上 我写了centos7中安装zookpeer和kafka(单机)的原因是因为不确定公司到底是想用kafka还是rockermq,所以在还没有正式的开发的时候去玩玩kafka。之后决定用rockermq了,所以我接下来写的技术flink和rockermq的一个整合。
首先要了解一下rockermq,初学的话可以看一下初试RocketMQ消息中间件丶一个站在Java后端设计之路的男青年个人博客网站文章,并安装RocketMQ 插件方便省事。
依次运行mqnamesrv.cmd脚本和mqbroker.cmd脚本,因为会出现中文乱码的缘故,我们在rocketmq-console项目的配置文件中添加和配置端口号
server.port=9875
server.tomcat.uri-encoding=UTF-8
并且将namesrvAddr修改成自己的ip地址
rocketmq.config.namesrvAddr=xxx.xxx.xx.xx:9876
然后运行它(随便怎么搞,反正启动就行了。在idea中也行,大jar启动也行)
之后在你游览器输入http://localhost:9875/#/ 可以选择中文还是英文。
在然后在你的项目中加入mq的配置
spring.rocketmq.nameServer=xx.xxx.xx.xxx:9876
在创建一个消费者的一个类,测试是否能发送成功
public class TestDome {
/**
* 消费者如需订阅某Topic下特定类型的消息!
* @param args
* @throws InterruptedException
* @throws MQClientException
*/
public static void main(String [] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer cOnsumer= new DefaultMQPushConsumer("bk_group");//消费者群组
consumer.setNamesrvAddr ("xx.xxx.xx.xx:9876");//mq的ip地址和端口
consumer.setInstanceName(UUID.randomUUID().toString());
// 请明确标明Tag:只关注自己需要的!
consumer.subscribe("test", "*");
consumer. registerMessageListener (new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs ,
ConsumeConcurrentlyContext context) {
for (MessageExt messageExt:msgs) {
String msg = new String(messageExt.getBody()); //从mq中取得消息
System.out.println(Thread.currentThread().getName() + " 接收消息: " + msg);
}
MessageExt msg = msgs.get (0);
/*** 对topic tag验证:只关注特定Pay*/
if (msg. getTopic (). equals("TopicModel") && msg. getTags (). equals("Pay")) {
System.out.print("特定类型:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
上面的消费者群组,你也可以换的。
这是我的主题,你也可以修改的。
修改之后启动他,发送信息
启动你的main方法之后提交信息,先提交启动都可以,无所谓
成功消费,最上面一条是启动直接生产的。
成功搞定,等待下一次flink和rockermq的一个整合,写的不一定特别对,毕竟我也是第一次接触rockermq和flink,如果觉得我写的不对的地方,欢迎留言,互相学习学习。