热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ小记(二)

1、RabbitMQ相关介绍(1)RabbitMQ整体上是一个生产者和消费者模型,主要负责接收、存储、转发消息。RabbitMQ整体结构图如下:   (2)生产者:发送消息的一方,

1、RabbitMQ相关介绍

(1)RabbitMQ整体上是一个生产者和消费者模型,主要负责接收、存储、转发消息。RabbitMQ整体结构图如下:

 

 

 

 (2)生产者:发送消息的一方,生产者创建一条消息,发布到RabbitMQ上,消息一般分为两部分:消息体和标签,消息体是带有业务逻辑结构的数据,也可以进一步对消息体进行序列化,标签用来描述这条消息。

     消费者:接收消息的一方,消费者创建一条连接,接到RabbitMQ服务器上的队列上,当消费者消费一条队列上的消息时,只是消费消息体,标签自动丢弃,所以消费者不会知道生产者是谁。

    Broker:消息中间服务节点,一个RabbitMQ Broker可以看作是一个RabbitMQ的实例,也可看作一台rabbitMQ的服务器。

    队列:Queue,RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅一个队列,不支持队列层面的广播消费。

    交换器:Exchange,生产者创建消息,把消息交给交换器,有交换器把消息发送到一个或多个队列上。如果交换器发送队列失败,消息会返回给生产者或者丢弃。RabbitMQ中交换器有四种类型:fanout、direct、topic、headers。

    fanout:四种交换器中其一,会把消息发送到所有与交换器绑定的队列上。

    direct:四种交换器其二,会把消息发送到bindingKey和RoutingKey完全匹配的队列上。

    topic:四种交换器其三,与direct相似,会把消息发送到bindingKey和RoutingKey完全匹配的队列上,但匹配规则不同。

    headers:四种交换器其四,根据消息中的headers的属性来进行匹配,性能差,基本不会使用。

    bindingKey:绑定键,RabbitMQ中通过绑定键把交换器和对列关联起来,与RoutingKey配合使用。

    RoutdingKey:路由键,生产者将消息发送给交换器时会指定一个RoutingKey,当bindingKey和RoutingKey完全匹配时,消息会被放到对应的队列上。

1、SpringBoot整合RabbitMQ

(1)环境配置

项目采用maven构建系统,所以需要在pom.xml的文件中引入RabbitMQ的相关依赖:



org.springframework.boot
spring-boot-starter-amqp
2.2.6.RELEASE

在application.yml中配置RabbitMQ相关信息:
Spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5727
    username: guest
    password: guest

(2)初始化RabbitMQ连接

public class RabbitMQConfig {
Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
/**
* rabbitmq连接
* */
@Bean
public Connection rabbitMQ_Config(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
Connection cOnn= null;
try {
//创建连接
cOnn= factory.newConnection();
} catch (IOException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (TimeoutException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
return conn;
}

@Bean
/**
* rabbitmq连接.uri方式
* */
public Connection rabbitMQ_Config_Uri(){
ConnectionFactory factory = new ConnectionFactory();
Connection cOnn= null;
try {
factory.setUri("amqp://"+username+":"+password+"@"+host+":"+port+"/virtualHost");
try {
cOnn= factory.newConnection();
} catch (IOException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (TimeoutException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
} catch (URISyntaxException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
} catch (KeyManagementException e) {
logger.error("创建MQ连接失败!",e);
e.printStackTrace();
}
return conn;
}
}

(3)生产者

/**
* 生产者
* */
@Bean
public boolean direct_MQ_Producer(String excahngeName, String exchangeType,String Queue,String Binding,byte[] array) throws Exception{

boolean flag = false;
Connection con = rabbitMQ_Config();
//创建通道
Channel channel = con.createChannel();
try {
//创建交换机
channel.exchangeDeclare(excahngeName,exchangeType,true);
//创建队列
channel.queueDeclare(Queue,true,false,false,null);
//将交换机与队列绑定
channel.queueBind(Queue,excahngeName,Binding);
//发送消息
channel.basicPublish(excahngeName,Binding, MessageProperties.PERSISTENT_TEXT_PLAIN,array);
flag = true;
} catch (IOException e) {
logger.error("发送消息失败!",e);
e.printStackTrace();
}finally {
//关闭通道连接
channel.close();
//关闭连接
con.close();
}
return flag;
}

(4)消费者   

/**
* 消费者
* */

/**
* direct类型交换机消费者
* */
@Bean
public String[] direct_MQ_Consumer(String Queue) throws IOException, TimeoutException {
Connection con = rabbitMQ_Config();
Channel channel = null;
final String[] message = {""};
try {
channel = con.createChannel();
//设置客户端最多接收未被接收的数目
channel.basicQos(64);
Channel finalChannel = channel;
//通过重写DefaultConsumer方法来实现消费者消息
Consumer cOnsumer= new DefaultConsumer(finalChannel){
@Override
public void handleDelivery(String s1, Envelope envelope1, AMQP.BasicProperties basicProperties1, byte[] bytes) throws IOException{
message[0] = new String(bytes);
System.out.println("recv messahe : " + new String(bytes));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
finalChannel.basicAck(envelope1.getDeliveryTag(),false);
}
};
channel.basicConsume(Queue,consumer);
TimeUnit.SECONDS.sleep(5);
} catch (IOException e) {
logger.error("创建通道失败!",e);
e.printStackTrace();
}catch (InterruptedException e) {
e.printStackTrace();
}finally {
//关闭资源
channel.close();
con.close();
}
return message;
}

推荐阅读
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • JVM参数设置与命令行工具详解
    JVM参数配置与命令行工具的深入解析旨在优化系统性能,通过合理设置JVM参数,确保在高吞吐量的前提下,有效减少垃圾回收(GC)的频率,进而降低系统停顿时间,提升服务的稳定性和响应速度。此外,本文还将详细介绍常用的JVM命令行工具,帮助开发者更好地监控和调优JVM运行状态。 ... [详细]
  • 使用cpphttplib构建HTTP服务器以处理带有查询参数的URL请求 ... [详细]
  • HBase客户端Table类中getRpcTimeout方法的应用与编程实例解析 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • 深入解析Tomcat:开发者的实用指南
    深入解析Tomcat:开发者的实用指南 ... [详细]
  • 如何在Java中高效构建WebService
    本文介绍了如何利用XFire框架在Java中高效构建WebService。XFire是一个轻量级、高性能的Java SOAP框架,能够简化WebService的开发流程。通过结合MyEclipse集成开发环境,开发者可以更便捷地进行项目配置和代码编写,从而提高开发效率。此外,文章还详细探讨了XFire的关键特性和最佳实践,为读者提供了实用的参考。 ... [详细]
  • 利用 JavaScript 实现定时任务的高效执行方法(代码可直接复用) ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • Jedis接口分类详解与应用指南
    本文详细解析了Jedis接口的分类及其应用指南,重点介绍了字符串数据类型(String)的接口功能。作为Redis中最基本的数据存储形式,字符串类型支持多种操作,如设置、获取和更新键值对等,适用于广泛的应用场景。 ... [详细]
  • 在软件开发领域,“池”技术被广泛应用,如数据库连接池、线程池等。本文重点探讨Java中的线程池ThreadPoolExecutor,通过详细解析其内部机制,帮助开发者理解如何高效利用线程池管理任务执行。线程池不仅能够显著减少系统资源的消耗,提高响应速度,还能通过合理的配置,如饱和策略,确保在高负载情况下系统的稳定性和可靠性。文章还将结合实际案例,展示线程池在不同应用场景下的具体实现与优化技巧。 ... [详细]
  • 探索JavaScript倒计时功能的三种高效实现方法及代码示例 ... [详细]
  • 使用Boost.Asio进行异步数据处理的应用程序主要依赖于两个核心概念:I/O服务和I/O对象。I/O服务抽象了操作系统接口,使得异步操作能够高效地执行。I/O对象则代表了具体的网络资源,如套接字和文件描述符,通过这些对象可以实现数据的读写操作。本文详细介绍了这两个概念在Boost.Asio中的应用及其在网络编程中的重要性。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • 开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用
    开发心得:深入探讨Servlet、Dubbo与MyBatis中的责任链模式应用 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有