热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringCloud终结篇之消息驱动stream大集合

创建子工程 stream-sample编写pom文件org.springframework.boot

创建子工程  stream-sample

编写pom文件



org.springframework.boot
spring-boot-starter-web


org.springframework.boot
spring-boot-starter-actuator


org.springframework.cloud
spring-cloud-starter-stream-rabbit

创建启动引导类  StreamApplication

 

@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}

 

创建配置文件

spring.application.name=stream-sample
server.port=63003
# RabbitMQ连接字符串
spring.rabbitmq.host=192.168.0.201
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 绑定Channel到broadcast
spring.cloud.stream.bindings.myTopic-consumer.destination=broadcast
spring.cloud.stream.bindings.myTopic-producer.destination=broadcast
# 消息分组示例
spring.cloud.stream.bindings.group-consumer.destination=group-topic
spring.cloud.stream.bindings.group-producer.destination=group-topic
spring.cloud.stream.bindings.group-consumer.group=Group-A
## 消息分区配置
## 打开消费者的消费分区功能
spring.cloud.stream.bindings.group-consumer.consumer.partitioned=true
## 两个消息分区
spring.cloud.stream.bindings.group-producer.producer.partition-count=2
# SpEL (Key resolver) 可以定义复杂表达式生成Key
# 我们这里用最简化的配置,只有索引参数为1的节点(消费者),才能消费消息
spring.cloud.stream.bindings.group-producer.producer.partition-key-expression=1
# 当前消费者实例总数
spring.cloud.stream.instanceCount=2
# 最大值instanceCount-1,当前实例的索引号
spring.cloud.stream.instanceIndex=1
# 延迟消息配置
spring.cloud.stream.bindings.delayed-consumer.destination=delayed-topic
spring.cloud.stream.bindings.delayed-producer.destination=delayed-topic
spring.cloud.stream.rabbit.bindings.delayed-producer.producer.delayed-exchange=true
# 异常消息(单机版重试)
spring.cloud.stream.bindings.error-consumer.destination=error-out-topic
spring.cloud.stream.bindings.error-producer.destination=error-out-topic
# 重试次数(本机重试)
# 次数=1相当于不重试
spring.cloud.stream.bindings.error-consumer.consumer.max-attempts=2
# 异常消息(requeue重试)
spring.cloud.stream.bindings.requeue-consumer.destination=requeue-topic
spring.cloud.stream.bindings.requeue-producer.destination=requeue-topic
# 必须把max-attempts设置为1,否则requeue不能生效
spring.cloud.stream.bindings.requeue-consumer.consumer.max-attempts=1
spring.cloud.stream.bindings.requeue-consumer.group=requeue-group
# 仅对当前requeue-consumer,开启requeue
spring.cloud.stream.rabbit.bindings.requeue-consumer.consumer.requeueRejected=true
# 默认全局开启requeue
# spring.rabbitmq.listener.default-requeue-rejected=true
# 死信队列配置
spring.cloud.stream.bindings.dlq-consumer.destination=dlq-topic
spring.cloud.stream.bindings.dlq-producer.destination=dlq-topic
spring.cloud.stream.bindings.dlq-consumer.consumer.max-attempts=2
spring.cloud.stream.bindings.dlq-consumer.group=dlq-group
# 开启死信队列(默认 topic.dlq)
spring.cloud.stream.rabbit.bindings.dlq-consumer.consumer.auto-bind-dlq=true
# Fallback配置
spring.cloud.stream.bindings.fallback-consumer.destination=fallback-topic
spring.cloud.stream.bindings.fallback-producer.destination=fallback-topic
spring.cloud.stream.bindings.fallback-consumer.consumer.max-attempts=2
spring.cloud.stream.bindings.fallback-consumer.group=fallback-group
# input channel -> fallback-topic.fallback-group.errors
management.security.enabled=false
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always

 

 

创建 Topic

 

延时消息

public interface DelayedTopic {
String INPUT = "delayed-consumer";
String OUTPUT = "delayed-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

死信队列

public interface DlqTopic {
String INPUT = "dlq-consumer";
String OUTPUT = "dlq-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

 

异常消息

public interface ErrorTopic {
String INPUT = "error-consumer";
String OUTPUT = "error-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

fallback降级

public interface FallbackTopic {
String INPUT = "fallback-consumer";
String OUTPUT = "fallback-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

分区分组

public interface GroupTopic {
String INPUT = "group-consumer";
String OUTPUT = "group-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

重入队列

public interface RequeueTopic {
String INPUT = "requeue-consumer";
String OUTPUT = "requeue-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

我的 消息

public interface MyTopic {
String INPUT = "myTopic-consumer";
String OUTPUT = "myTopic-producer";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}

创建stream 流  消息消费者

@Slf4j
@EnableBinding(value = {
Sink.class,
MyTopic.class,
GroupTopic.class,
DelayedTopic.class,
ErrorTopic.class,
RequeueTopic.class,
DlqTopic.class,
FallbackTopic.class
}
)
public class StreamConsumer {
private AtomicInteger count = new AtomicInteger(1);
@StreamListener(Sink.INPUT)
public void consume(Object payload) {
log.info("message consumed successfully, payload={}", payload);
}
// 自定义消息广播
@StreamListener(MyTopic.INPUT)
public void consumeMyMessage(Object payload) {
log.info("My message consumed successfully, payload={}", payload);
}
// 消息分组 & 消费分区示例
@StreamListener(GroupTopic.INPUT)
public void consumeGroupMessage(Object payload) {
log.info("Group message consumed successfully, payload={}", payload);
}
// 延迟消息示例
@StreamListener(DelayedTopic.INPUT)
public void consumeDelayedMessage(MessageBean bean) {
log.info("Delayed message consumed successfully, payload={}", bean.getPayload());
}
// 异常重试(单机版)
@StreamListener(ErrorTopic.INPUT)
public void consumeErrorMessage(MessageBean bean) {
log.info("Are you OK?");
if (count.incrementAndGet() % 3 == 0) {
log.info("Fine, thank you. And you?");
count.set(0);
} else {
log.info("What's your problem?");
throw new RuntimeException("I'm not OK");
}
}
// 异常重试(联机版-重新入列)
@StreamListener(RequeueTopic.INPUT)
public void requeueErrorMessage(MessageBean bean) {
log.info("Are you OK?");
try {
Thread.sleep(3000L);
} catch (Exception e) {
}
// throw new RuntimeException("I'm not OK");
}
// 死信队列
@StreamListener(DlqTopic.INPUT)
public void consumeDlqMessage(MessageBean bean) {
log.info("Dlq - Are you OK?");
if (count.incrementAndGet() % 3 == 0) {
log.info("Dlq - Fine, thank you. And you?");
} else {
log.info("Dlq - What's your problem?");
throw new RuntimeException("I'm not OK");
}
}
// Fallback + 升级版本
@StreamListener(FallbackTopic.INPUT)
public void goodbyeBadGuy(MessageBean bean,
@Header("version") String version) {
log.info("Fallback - Are you OK?");
if ("1.0".equalsIgnoreCase(version)) {
log.info("Fallback - Fine, thank you. And you?");
} else if ("2.0".equalsIgnoreCase(version)) {
log.info("unsupported version");
throw new RuntimeException("I'm not OK");
} else {
log.info("Fallback - version={}", version);
}
}
// 降级流程
@ServiceActivator(inputChannel = "fallback-topic.fallback-group.errors")
public void fallback(Message message) {
log.info("fallback entered");
}
}

创建一个 messageBean

 

@Data
public class MessageBean {
private String payload;
}

最后一步 创建 Controller  

@RestController
@Slf4j
public class Controller {
@Autowired
private MyTopic producer;
@Autowired
private GroupTopic groupTopicProducer;
@Autowired
private DelayedTopic delayedTopicProducer;
@Autowired
private ErrorTopic errorTopicProducer;
@Autowired
private RequeueTopic requeueTopicProducer;
@Autowired
private DlqTopic dlqTopicProducer;
@Autowired
private FallbackTopic fallbackTopicProducer;
// 简单广播消息
@PostMapping("send")
public void sendMessage(@RequestParam(value = "body") String body) {
producer.output().send(MessageBuilder.withPayload(body).build());
}
// 消息分组和消息分区
@PostMapping("sendToGroup")
public void sendMessageToGroup(@RequestParam(value = "body") String body) {
groupTopicProducer.output().send(MessageBuilder.withPayload(body).build());
}
// 延迟消息
@PostMapping("sendDM")
public void sendDelayedMessage(
@RequestParam(value = "body") String body,
@RequestParam(value = "seconds") Integer seconds) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
log.info("ready to send delayed message");
delayedTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("x-delay", seconds * 1000)
.build());
}
// 异常重试(单机版)
@PostMapping("sendError")
public void sendErrorMessage(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
errorTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// 异常重试(联机版 - 重新入列)
@PostMapping("requeue")
public void sendErrorMessageToMQ(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
requeueTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// 死信队列测试
@PostMapping("dlq")
public void sendMessageToDlq(@RequestParam(value = "body") String body) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
dlqTopicProducer.output().send(MessageBuilder.withPayload(msg).build());
}
// fallback + 升版
@PostMapping("fallback")
public void sendMessageToFallback(
@RequestParam(value = "body") String body,
@RequestParam(value = "version", defaultValue = "1.0") String version) {
MessageBean msg = new MessageBean();
msg.setPayload(body);
fallbackTopicProducer.output().send(
MessageBuilder.withPayload(msg)
.setHeader("version", version)
.build());
}
}

 

 

 

附:

1.    下载插件
https://www.rabbitmq.com/community-plugins.html

找到rabbitmq_delayed_message_exchange
下载对应版本的插件,3.6和3.7版本插件不一样

2. 下载以后解压,copy到rabbitmq安装目录下的plugins文件夹

3.    安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4.    安装完一定要重启RabbitMQ,不是单单重启UI管理界面!
如果只是单单调用rabbitmqctl  stop_app然后再rabbitmqctl  start_app是没有作用的!
正确的步骤是先rabbitmqctl stop,然后再直接执行rabbitmq-server

如果以上步骤还能使延迟队列生效,在重启完之后,换一个新的topic名字就好了
 

本文地址:https://blog.csdn.net/weixin_38305866/article/details/109941990



推荐阅读
  • Spring Cloud Config 使用 Vault 作为配置存储
    本文探讨了如何在Spring Cloud Config中集成HashiCorp Vault作为配置存储解决方案,基于Spring Cloud Hoxton.RELEASE及Spring Boot 2.2.1.RELEASE版本。文章还提供了详细的配置示例和实践建议。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 简化报表生成:EasyReport工具的全面解析
    本文详细介绍了EasyReport,一个易于使用的开源Web报表工具。该工具支持Hadoop、HBase及多种关系型数据库,能够将SQL查询结果转换为HTML表格,并提供Excel导出、图表显示和表头冻结等功能。 ... [详细]
  • 本文将详细介绍通过CAS(Central Authentication Service)实现单点登录的原理和步骤。CAS由耶鲁大学开发,旨在为多应用系统提供统一的身份认证服务。文中不仅涵盖了CAS的基本架构,还提供了具体的配置实例,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 目录一、salt-job管理#job存放数据目录#缓存时间设置#Others二、returns模块配置job数据入库#配置returns返回值信息#mysql安全设置#创建模块相关 ... [详细]
  • 深入解析Serverless架构模式
    本文将详细介绍Serverless架构模式的核心概念、工作原理及其优势。通过对比传统架构,探讨Serverless如何简化应用开发与运维流程,并介绍当前主流的Serverless平台。 ... [详细]
  • 云函数与数据库API实现增删查改的对比
    本文将深入探讨使用云函数和数据库API实现数据操作(增删查改)的不同方法,通过详细的代码示例帮助读者更好地理解和掌握这些技术。文章不仅提供代码实现,还解释了每种方法的特点和适用场景。 ... [详细]
  • Python 工具推荐 | PyHubWeekly 第二十一期:提升命令行体验的五大工具
    本期 PyHubWeekly 为大家精选了 GitHub 上五个优秀的 Python 工具,涵盖金融数据可视化、终端美化、国际化支持、图像增强和远程 Shell 环境配置。欢迎关注并参与项目。 ... [详细]
  • docker镜像重启_docker怎么启动镜像dock ... [详细]
  • Logback使用小结
    1一定要使用slf4j的jar包,不要使用apachecommons的jar。否则滚动生成文件不生效,不滚动的时候却生效~~importorg.slf ... [详细]
  • 深入解析Spring Cloud微服务架构与分布式系统实战
    本文详细介绍了Spring Cloud在微服务架构和分布式系统中的应用,结合实际案例和最新技术,帮助读者全面掌握微服务的实现与优化。 ... [详细]
  • 最新计算机专业原创毕业设计参考选题都有源码+数据库是近期作品ling取参考你的选题刚好在下面有,有时间看到机会给您发1ssm资源循环利用2springboot校园考勤系统3ssm防 ... [详细]
  • 前言无论是对于刚入行工作还是已经工作几年的java开发者来说,面试求职始终是你需要直面的一件事情。首先梳理自己的知识体系,针对性准备,会有事半功倍的效果。我们往往会把重点放在技术上 ... [详细]
  • 本文作为SpringCloud Alibaba系列教程的第一部分,主要介绍如何搭建SpringCloud Alibaba的开发环境,帮助初学者快速入门。SpringCloud Alibaba是由阿里巴巴团队开源的一套微服务工具集,旨在简化分布式系统的构建过程。 ... [详细]
  • 字节跳动夏季招聘面试经验分享
    本文详细记录了字节跳动夏季招聘的面试经历,涵盖了一、二、三轮面试的技术问题及项目讨论,旨在为准备类似面试的求职者提供参考。 ... [详细]
author-avatar
抓尼莫為生
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有