热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

Java搭建RabbitMq消息中间件过程详解

这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

这篇文章主要介绍了Java搭建RabbitMq消息中间件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

前言

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

  • exchange: 交换机
  • routingkey: 路由key
  • queue:队列

控制台端口:15672

  exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

1.技能订单3分钟自动取消,改变状态

2.直播开始前15分钟提醒

3.直播状态自动结束

流程

  生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

  —> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

第一步:在pom文件中添加


   org.springframework.boot
   spring-boot-starter-amqp

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxx
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-cOnfirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列)
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Configuration
public class OrderQueueConfig {

  /**
   * 订单缓冲交换机名称
   */
  public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";

  /**
   * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】
   */
  public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";

  /**
   * 订单的交换机DLX 名字
   */
  final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";

  /**
   * 订单message时间过期后进入的队列,也就是订单实际的消费队列
   */
  public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";

  /**
   * 订单在缓冲队列过期时间(毫秒)30分钟
   */
  public final static int ORDER_QUEUE_EXPIRATION = 1800000;

  /**
   * 订单缓冲交换机
   * 
   * @return
   */
  @Bean
  public DirectExchange preOrderExange() {
    return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);
  }

  /**
   * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列
   * 
   * @return
   */
  @Bean
  public Queue delayQueuePerOrderTTLQueue() {
    return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)
        .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX
        .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
        .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间
        .build();
  }

  /**
   * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列
   *
   * @param delayQueuePerOrderTTLQueue
   * @param preOrderExange
   * @return
   */
  @Bean
  public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {
    return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);
  }

  /**
   * 创建订单的DLX exchange
   *
   * @return
   */
  @Bean
  public DirectExchange delayOrderExchange() {
    return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);
  }

  /**
   * 创建order_delay_process_queue队列,也就是订单实际消费队列
   *
   * @return
   */
  @Bean
  public Queue delayProcessOrderQueue() {
    return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();
  }

  /**
   * 将DLX绑定到实际消费队列
   *
   * @param delayProcessOrderQueue
   * @param delayExchange
   * @return
   */
  @Bean
  public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {
    return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);
  }

  /**
   * 监听订单实际消费者队列order_delay_process_queue
   * 
   * @param connectionFactory
   * @param processReceiver
   * @return
   */
  @Bean
  public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,
      OrderProcessReceiver processReceiver) {
    SimpleMessageListenerContainer cOntainer= new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
  }
}

消费者 OrderProcessReceiver :

package com.tuohang.platform.config;

import java.util.Objects;

import org.apache.tools.ant.types.resources.selectors.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

/**
 * 订单延迟处理消费者
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月18日
 */
@Component
public class OrderProcessReceiver implements ChannelAwareMessageListener {

  private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @Override
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
    } catch (Exception e) {
      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
      channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }
  
  /**
   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received <" + realMessage + ">");
    // 取消订单
    if(!Objects.equals(realMessage, msg)) {
//      SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));
      System.out.println("测试111111-----------"+new Date());
      System.out.println(message);
    }
  }
}

或者

/**
 * 测试 rabbit 消费者
 * 
 * 
 * @author Administrator
 * @version 1.0
 * @Date 2018年9月25日
 */
@Component
@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)
public class TestProcessReceiver {

  private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);

  String msg = "The failed message will auto retry after a certain delay";

  @RabbitHandler
  public void onMessage(Message message, Channel channel) throws Exception {
    try {
      processMessage(message);
      //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做
      channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,
          msg.getBytes());
    }
  }
  
  /**
   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)
   *
   * @param message
   * @throws Exception
   */
  public void processMessage(Message message) throws Exception {
    String realMessage = new String(message.getBody());
    logger.info("Received <" + realMessage + " >");
    // 取消订单
    if(!Objects.equals(realMessage, msg)) {
      System.out.println("测试111111-----------"+new Date());
    }else {
      System.out.println("rabbit else...");
    }
  }
}

生产者

/**
   * 测试rabbitmq
   * 
   * @return
   */
  @RequestMapping(value = "/testrab")
  public String testraa() {
    GenericResult gr = null;
    try {
      String name = "test_pre_ttl_delay_queue";
  long expiration = 10000;//10s 过期时间
      rabbitTemplate.convertAndSend(name,String.valueOf(123456));
 // 在单个消息上设置过期时间
 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));


    } catch (ServiceException e) {
      e.printStackTrace();
      gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());
    }
    
    return getWrite(gr);
  }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


推荐阅读
  • 本文详细分析了Hive在启动过程中遇到的权限拒绝错误,并提供了多种解决方案,包括调整文件权限、用户组设置以及环境变量配置等。 ... [详细]
  • 本文探讨了如何优化和正确配置Kafka Streams应用程序以确保准确的状态存储查询。通过调整配置参数和代码逻辑,可以有效解决数据不一致的问题。 ... [详细]
  • 网络运维工程师负责确保企业IT基础设施的稳定运行,保障业务连续性和数据安全。他们需要具备多种技能,包括搭建和维护网络环境、监控系统性能、处理突发事件等。本文将探讨网络运维工程师的职业前景及其平均薪酬水平。 ... [详细]
  • PHP 5.5.0rc1 发布:深入解析 Zend OPcache
    2013年5月9日,PHP官方发布了PHP 5.5.0rc1和PHP 5.4.15正式版,这两个版本均支持64位环境。本文将详细介绍Zend OPcache的功能及其在Windows环境下的配置与测试。 ... [详细]
  • 如何在PHPCMS V9中实现多站点功能并配置独立域名与动态URL
    本文介绍如何在PHPCMS V9中创建和管理多个站点,包括配置独立域名、设置动态URL,并确保各子站能够正常运行。我们将详细讲解从新建站点到最终配置路由的每一步骤。 ... [详细]
  • 本文详细介绍如何使用Samba软件配置CIFS文件共享服务,涵盖安装、配置、权限管理及多用户挂载等关键步骤。通过具体示例和命令行操作,帮助读者快速搭建并优化Samba服务器。 ... [详细]
  • 本文介绍了如何使用PHP代码实现微信平台的媒体素材上传功能,详细解释了API接口的使用方法和注意事项,确保文件路径正确以避免常见的错误。 ... [详细]
  • 使用Vultr云服务器和Namesilo域名搭建个人网站
    本文详细介绍了如何通过Vultr云服务器和Namesilo域名搭建一个功能齐全的个人网站,包括购买、配置服务器以及绑定域名的具体步骤。文章还提供了详细的命令行操作指南,帮助读者顺利完成建站过程。 ... [详细]
  • 本文详细介绍了 MySQL 的查询处理流程,包括从客户端连接到服务器、查询缓存检查、语句解析、查询优化及执行等步骤。同时,深入探讨了 MySQL 中的乐观锁机制及其在并发控制中的应用。 ... [详细]
  • 在现代网络环境中,两台计算机之间的文件传输需求日益增长。传统的FTP和SSH方式虽然有效,但其配置复杂、步骤繁琐,难以满足快速且安全的传输需求。本文将介绍一种基于Go语言开发的新一代文件传输工具——Croc,它不仅简化了操作流程,还提供了强大的加密和跨平台支持。 ... [详细]
  • MySQL缓存机制深度解析
    本文详细探讨了MySQL的缓存机制,包括主从复制、读写分离以及缓存同步策略等内容。通过理解这些概念和技术,读者可以更好地优化数据库性能。 ... [详细]
  • 近期遇到电脑网络不稳定和游戏时频繁重启的问题,寻求专业建议。网络环境为ADSL调制解调器通过路由器共享给两台电脑使用,怀疑存在ARP攻击或硬件配置问题。希望获得详细的故障排查和解决方案。 ... [详细]
  • 使用Python在SAE上开发新浪微博应用的初步探索
    最近重新审视了新浪云平台(SAE)提供的服务,发现其已支持Python开发。本文将详细介绍如何利用Django框架构建一个简单的新浪微博应用,并分享开发过程中的关键步骤。 ... [详细]
  • 从零开始构建完整手机站:Vue CLI 3 实战指南(第一部分)
    本系列教程将引导您使用 Vue CLI 3 构建一个功能齐全的移动应用。我们将深入探讨项目中涉及的每一个知识点,并确保这些内容与实际工作中的需求紧密结合。 ... [详细]
  • MySQL 数据库迁移指南:从本地到远程及磁盘间迁移
    本文详细介绍了如何在不同场景下进行 MySQL 数据库的迁移,包括从一个硬盘迁移到另一个硬盘、从一台计算机迁移到另一台计算机,以及解决迁移过程中可能遇到的问题。 ... [详细]
author-avatar
星痕之刃
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有