热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Rabbitmq的原生javaAPI讲解

说明实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Ra

说明

实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Rabbitmq的特性和对springboot封装的组件的使用原理有一定了解。


关键的类


ConnectionFactory

从字面上看是连接工厂的意思,实际上该对象就是一个连接工厂对象,主要用于创建一个连接到Rabbitmq服务器的tcp连接。

基本的方法:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //设置连接的服务器的ip
factory.setPort(5672); //设置端口,如果使用AMQP协议进行通信的话,这里就写该协议的通信端口。
factory.setUsername("guest"); //用户名
factory.setPassword("guest"); //密码
//设置这个uri可以代替上面的设置值,协议://用户名:密码@ip:port
//factory.setUri("amqp://guest:guest@192.168.18.140:5672");
factory.setVirtualHost("/"); //设置连接的虚拟主机
//设置连接超时时间
factory.setConnectionTimeout(10000);Connection conn = factory.newConnection(); //创建一个TCP连接。

Connection

一个与rabbitmq服务器的TCP连接对象,因为Rabbitmq为了节省连接频繁创建和销毁的资源消耗,使用了channel来进行对Rabbitmq服务器的操作。所以Connection对象不会进行对Rabbitmq服务器的操作,该操作要交给Channel对象,而Connection对象的主要目的是为了创建一个Channel对象。

Connection conn = factory.newConnection();
Channel channel = conn.createChannel(); //创建一个channel对象,用于操作服务器。
Channel channel = conn.createChannel(1); //创建一个channel对象,并手动指定一个通道号码。上面的方法是自动为我们分配号码。范围1-2047
Optional<Channel> channel &#61; conn.openChannel(); //创建一个channel对象&#xff0c;把他封装在Optional对象中。
Optional<Channel> channel &#61; conn.openChannel(1);//创建一个channel对象&#xff0c;把他封装在Optional对象中。并手动指定通道号码。范围1-2047conn.close(); //关闭连接

Channel

对Rabbitmq服务器的操作基本都在这里了。

//交换器相关操作

//对交换器的操作&#xff0c;挑出来的都是重载方法中最全参数的重载方法&#xff0c;其他都是这个方法的删减用于使用一些默认值。
//声明一个交换器&#xff0c;这个是重载方法中参数最全的方法了&#xff0c;其他重载方法是对该方法的一些删减以使用某个默认值。DeclareOk 返回值主要用于查看一些创建信息。如果声明失败返回null。
//如果交换器已经存在&#xff0c;那么声明的参数一定要与已存在的交换器的参数一致&#xff0c;否则会报错。
Exchange.DeclareOk exchangeDeclare(String exchange, //要声明的交换器名称BuiltinExchangeType type, //要声明的交换器类型&#xff0c;BuiltinExchangeType 是一个枚举类&#xff0c;对交换器四种类型进行一个枚举。boolean durable, //是否持久化队列boolean autoDelete, //是否自动删除&#xff0c;具体含义可以看博主的另一篇文章有讲解boolean internal, //是否内部交换器Map<String, Object> arguments) throws IOException; //其他属性参数。//异步创建交换器&#xff0c;上面的是同步等待创建成功&#xff0c;这个异步&#xff0c;参数与上面一致。
void exchangeDeclareNoWait(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;//对交换机进行绑定交换器
Exchange.BindOk exchangeBind(String destination, //设置目标交换器名称。String source, //设置源交换器名称&#xff0c;也就是消息先到该交换器&#xff0c;再路由到目标交换器。String routingKey, //绑定键&#xff0c;到时与消息的路由键进行匹配看是否路由到绑定的交换机或队列中去。Map<String, Object> arguments) throws IOException; //其他参数//异步对交换机绑定交换机&#xff0c;参数与上面一致。
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//删除一个交换器
Exchange.DeleteOk exchangeDelete(String exchange, //要删除的交换器名称boolean ifUnused //如果为true&#xff0c;则交换器没有被生产者使用时才可以删除&#xff0c;否则删除失败&#xff0c;另一个重载方法使用ifUnused &#61;false为默认值) throws IOException;//异步删除一个交换器&#xff0c;参数与上面方法一致。
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;//对交换器与交换器进行接触绑定
Exchange.UnbindOk exchangeUnbind(String destination, //目标交换器名称String source, //源交换器名称&#xff0c;在这边来解绑。String routingKey, //绑定键&#xff0c;destination、source、routingKey三者才能确定一个绑定关系。Map<String, Object> arguments //其他参数) throws IOException;//异步对交换器与交换器进行接触绑定&#xff0c;参数与上面一致
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//查看交换器是否存在&#xff0c;存在就正常返回&#xff0c;不存在就抛出异常
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

//队列相关操作

//队列相关操作
//声明创建一个队列。
Queue.DeclareOk queueDeclare(String queue, //要创建的队列名boolean durable, //是否持久化队列boolean exclusive, //是否排他队列&#xff0c;如果true&#xff0c;那么该队列只能在声明该队列的Connection下的Channel中使用&#xff0c;其他Connection创建的Channel不能使用&#xff0c;连接断开时队列自动删除boolean autoDelete,//是否自动删除Map<String, Object> arguments //其他参数) throws IOException;//异步声明创建一个队列
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;//交换器绑定队列
Queue.BindOk queueBind(String queue, //要绑定的队列名String exchange, //要绑定的交换器名String routingKey, //绑定键Map<String, Object> arguments //其他参数) throws IOException;//异步进行交换器绑定队列
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;//删除队列
Queue.DeleteOk queueDelete(String queue, //要删除的队列名boolean ifUnused, //如果为true&#xff0c;则只有队列没有被消费者使用时才删除成功&#xff0c;如果为false&#xff0c;就不管有没有&#xff0c;直接删除。boolean ifEmpty //如果为true&#xff0c;则只有队列为空队列时才能删除成功) throws IOException;//异步删除队列
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;//接触交换器与队列的绑定
Queue.UnbindOk queueUnbind(String queue, //要解绑的队列名String exchange, //要解绑的交换器名String routingKey, //绑定键 三者才能唯一确定一个绑定Map<String, Object> arguments //其他参数) throws IOException;//清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;//查看队列是否存在&#xff0c;存在就返回一个DeclareOk &#xff0c;不存在或者该队列时排他队列并且创建者不是这个连接&#xff0c;就抛出异常。Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

Message属性参数配置&#xff1a;

//原生API的消息属性参数配置主要由AMQP.BasicProperties类配置Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");AMQP.BasicProperties properties &#61; new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化.contentEncoding("UTF-8") // 编码.expiration("10000") // TTL&#xff0c;过期时间.headers(headers) // 自定义属性.priority(5) // 优先级&#xff0c;默认为5&#xff0c;配合队列的 x-max-priority 属性使用.messageId(String.valueOf(UUID.randomUUID())) //消息的Id.appId("order-service") //消息推送的应用程序.contentType("application/json") //消息的内容类型.replyTo("replyToQueue") //RPC模式下的回调.timestamp(new Date()) //消息发送的时间.userId("15355153") //一般生产者的id.build();//然后在消息发布时&#xff0c;指定properties
channel.basicPublish("", QUEUE_NAME, properties, "aaaa".getBytes());

这里暂时就讲那么多&#xff0c;关于消息的生产消费有很多细节&#xff0c;比如事务&#xff0c;消息确认、回调等等等等。另外开个新章节专门讲这些问题。

下面附上一个普通的消息收发例子&#xff1a;

public class MyProducer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 连接端口factory.setPort(5672);// 用户factory.setUsername("guest");factory.setPassword("guest");//设置这个uri可以代替上面的设置值&#xff0c;协议://用户名:密码&#64;ip:port//factory.setUri("amqp://guest:guest&#64;192.168.18.140:5672");// 虚拟机factory.setVirtualHost("/");//设置连接超时时间factory.setConnectionTimeout(10000);// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel(1000);// 要发送消息String msg &#61; "Hello world, Rabbit MQ";// String exchange 要发送到的交换器名称, String routingKey 消息的路由键, BasicProperties props 消息的额外属性, byte[] body 消息的具体内容channel.basicPublish(EXCHANGE_NAME, "testdemo", null, msg.getBytes());channel.close();conn.close();}
}public class MyConsumer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";private final static String QUEUE_NAME &#61; "SIMPLE_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 默认监听端口factory.setPort(5672);// 虚拟机factory.setVirtualHost("/");// 设置访问的用户factory.setUsername("guest");factory.setPassword("guest");// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel();// 声明交换机// String exchange 交换机名称, String type, boolean durable, boolean autoDelete, Map argumentsAMQP.Exchange.DeclareOk direct &#61; channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);// 声明队列// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" Waiting for message....");// 绑定队列和交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"testdemo");// 创建消费者Consumer consumer &#61; new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg &#61; new String(body, "UTF-8");System.out.println("Received message : &#39;" &#43; msg &#43; "&#39;");System.out.println("consumerTag : " &#43; consumerTag );System.out.println("deliveryTag : " &#43; envelope.getDeliveryTag() );}};// 开始获取消息// String queue, boolean autoAck, Consumer callbackchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

如果第一次&#xff0c;服务器并没有队列和交换器&#xff0c;要先启动消费者再启动生产者&#xff0c;因为队列和交换器的声明都在消费者中完成&#xff0c;如果有了队列和交换器&#xff0c;哪个先启动都没关系。


推荐阅读
  • 本文介绍如何在Spring Boot项目中集成Redis,并通过具体案例展示其配置和使用方法。包括添加依赖、配置连接信息、自定义序列化方式以及实现仓储接口。 ... [详细]
  • 本文详细介绍了优化DB2数据库性能的多种方法,涵盖统计信息更新、缓冲池调整、日志缓冲区配置、应用程序堆大小设置、排序堆参数调整、代理程序管理、锁机制优化、活动应用程序限制、页清除程序配置、I/O服务器数量设定以及编入组提交数调整等方面。通过这些技术手段,可以显著提升数据库的运行效率和响应速度。 ... [详细]
  • 深入解析Java枚举及其高级特性
    本文详细介绍了Java枚举的概念、语法、使用规则和应用场景,并探讨了其在实际编程中的高级应用。所有相关内容已收录于GitHub仓库[JavaLearningmanual](https://github.com/Ziphtracks/JavaLearningmanual),欢迎Star并持续关注。 ... [详细]
  • 深入解析Spring启动过程
    本文详细介绍了Spring框架的启动流程,帮助开发者理解其内部机制。通过具体示例和代码片段,解释了Bean定义、工厂类、读取器以及条件评估等关键概念,使读者能够更全面地掌握Spring的初始化过程。 ... [详细]
  • 在尝试使用C# Windows Forms客户端通过SignalR连接到ASP.NET服务器时,遇到了内部服务器错误(500)。本文将详细探讨问题的原因及解决方案。 ... [详细]
  • 深入解析ESFramework中的AgileTcp组件
    本文详细介绍了ESFramework框架中AgileTcp组件的设计与实现。AgileTcp是ESFramework提供的ITcp接口的高效实现,旨在优化TCP通信的性能和结构清晰度。 ... [详细]
  • 为了解决不同服务器间共享图片的需求,我们最初考虑建立一个FTP图片服务器。然而,考虑到项目是一个简单的CMS系统,为了简化流程,团队决定探索七牛云存储的解决方案。本文将详细介绍使用七牛云存储的过程和心得。 ... [详细]
  • 使用JS、HTML5和C3创建自定义弹出窗口
    本文介绍如何结合JavaScript、HTML5和C3.js来实现一个功能丰富的自定义弹出窗口。通过具体的代码示例,详细讲解了实现过程中的关键步骤和技术要点。 ... [详细]
  • 本文详细介绍了Java中实现异步调用的多种方式,包括线程创建、Future接口、CompletableFuture类以及Spring框架的@Async注解。通过代码示例和深入解析,帮助读者理解并掌握这些技术。 ... [详细]
  • 本文探讨了如何通过一系列技术手段提升Spring Boot项目的并发处理能力,解决生产环境中因慢请求导致的系统性能下降问题。 ... [详细]
  • 本文深入探讨了 PHP 实现计划任务的方法,包括其原理、具体实现方式以及在不同操作系统中的应用。通过详细示例和代码片段,帮助开发者理解和掌握如何高效地设置和管理定时任务。 ... [详细]
  • 请看|间隔时间_Postgresql 主从复制 ... [详细]
  • 本文详细介绍了如何使用 PHP 接收并处理微信支付的回调结果,确保支付通知能够被正确接收和响应。 ... [详细]
  • 本文介绍了如何使用JavaScript的Fetch API与Express服务器进行交互,涵盖了GET、POST、PUT和DELETE请求的实现,并展示了如何处理JSON响应。 ... [详细]
  • Python + Pytest 接口自动化测试中 Token 关联登录的实现方法
    本文将深入探讨 Python 和 Pytest 在接口自动化测试中如何实现 Token 关联登录,内容详尽、逻辑清晰,旨在帮助读者掌握这一关键技能。 ... [详细]
author-avatar
山海
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有