作者:mobiledu2502871243 | 来源:互联网 | 2023-08-21 11:18
强烈推荐一个大神的人工智能的教程:http:www.captainbed.netzhanghan前言至于rabbitMQ的安装我就不写了,近期由于项目需求,使用Ra
强烈推荐一个大神的人工智能的教程:http://www.captainbed.net/zhanghan
前言
至于rabbitMQ的安装我就不写了,近期由于项目需求,使用RabbitMQ异步处理数据,为了数据安全,加入死信队列概念保证数据的简单安全性。这里的安全主要是,超时,处理异常这些内容,一旦程序拿到队列中的数据,处理过程中出现了异常,非常有可能导致数据的丢失。
配置
队列和交换机之间的关系图简单如下:
1.建立交换机
2.建立队列
3.交换机与队列之间绑定
代码实例测试
添加rabbitmq的配置文件
spring配置文件加载rabbitmq的配置文件
测试rabbitmq的生产者代码
@Autowired
private RabbitTemplate template;/*** 测试rabbitmq* @throws JsonProcessingException 异常信息*/@Overridepublic void test() throws JsonProcessingException {ChooseCourseMQModel chooseCourseMQModel = new ChooseCourseMQModel("13060141059", "123", CourseTypeEnum.ADD_CHOOSE_COURSE);Message msg = MessageBuilder.withBody(SerializationUtils.serialize(chooseCourseMQModel)).build();template.convertAndSend(msg);}
消费者:
public class MqConsumer implements ChannelAwareMessageListener {@Override@Transactional(rollbackFor = Exception.class)public void onMessage(Message message, Channel channel) throws Exception {System.out.println("message:" + message);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("---msg recv:" + message.toString());try {ChooseCourseMQModel chooseCourseMQModel = (ChooseCourseMQModel) SerializationUtils.deserialize(message.getBody());//业务代码,处理正确,发送确认信息,basicAckif{channel.basicAck(deliveryTag, false);} else { //处理失败则发送拒绝信息,放入死信队列channel.basicReject(deliveryTag, false);return;}} catch (Exception e) { //处理发生异常则放入死信队列,发送basicRejectSystem.out.println("error:" + e);channel.basicReject(deliveryTag, false);}}
}
简单的应用到此已经结束了,建立之后可以在消费者中的try语句块中写一个异常,然后发现生产者队列中的数据通过死信交换机到达死信队列,然后根据业务场景,可以重试或者写一个定时任务重新处理,保证异步处理数据的正确性和稳定性!