热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Rabbitmq的原生javaAPI讲解

说明实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Ra

说明

实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Rabbitmq的特性和对springboot封装的组件的使用原理有一定了解。


关键的类


ConnectionFactory

从字面上看是连接工厂的意思,实际上该对象就是一个连接工厂对象,主要用于创建一个连接到Rabbitmq服务器的tcp连接。

基本的方法:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //设置连接的服务器的ip
factory.setPort(5672); //设置端口,如果使用AMQP协议进行通信的话,这里就写该协议的通信端口。
factory.setUsername("guest"); //用户名
factory.setPassword("guest"); //密码
//设置这个uri可以代替上面的设置值,协议://用户名:密码@ip:port
//factory.setUri("amqp://guest:guest@192.168.18.140:5672");
factory.setVirtualHost("/"); //设置连接的虚拟主机
//设置连接超时时间
factory.setConnectionTimeout(10000);Connection conn = factory.newConnection(); //创建一个TCP连接。

Connection

一个与rabbitmq服务器的TCP连接对象,因为Rabbitmq为了节省连接频繁创建和销毁的资源消耗,使用了channel来进行对Rabbitmq服务器的操作。所以Connection对象不会进行对Rabbitmq服务器的操作,该操作要交给Channel对象,而Connection对象的主要目的是为了创建一个Channel对象。

Connection conn = factory.newConnection();
Channel channel = conn.createChannel(); //创建一个channel对象,用于操作服务器。
Channel channel = conn.createChannel(1); //创建一个channel对象,并手动指定一个通道号码。上面的方法是自动为我们分配号码。范围1-2047
Optional<Channel> channel &#61; conn.openChannel(); //创建一个channel对象&#xff0c;把他封装在Optional对象中。
Optional<Channel> channel &#61; conn.openChannel(1);//创建一个channel对象&#xff0c;把他封装在Optional对象中。并手动指定通道号码。范围1-2047conn.close(); //关闭连接

Channel

对Rabbitmq服务器的操作基本都在这里了。

//交换器相关操作

//对交换器的操作&#xff0c;挑出来的都是重载方法中最全参数的重载方法&#xff0c;其他都是这个方法的删减用于使用一些默认值。
//声明一个交换器&#xff0c;这个是重载方法中参数最全的方法了&#xff0c;其他重载方法是对该方法的一些删减以使用某个默认值。DeclareOk 返回值主要用于查看一些创建信息。如果声明失败返回null。
//如果交换器已经存在&#xff0c;那么声明的参数一定要与已存在的交换器的参数一致&#xff0c;否则会报错。
Exchange.DeclareOk exchangeDeclare(String exchange, //要声明的交换器名称BuiltinExchangeType type, //要声明的交换器类型&#xff0c;BuiltinExchangeType 是一个枚举类&#xff0c;对交换器四种类型进行一个枚举。boolean durable, //是否持久化队列boolean autoDelete, //是否自动删除&#xff0c;具体含义可以看博主的另一篇文章有讲解boolean internal, //是否内部交换器Map<String, Object> arguments) throws IOException; //其他属性参数。//异步创建交换器&#xff0c;上面的是同步等待创建成功&#xff0c;这个异步&#xff0c;参数与上面一致。
void exchangeDeclareNoWait(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;//对交换机进行绑定交换器
Exchange.BindOk exchangeBind(String destination, //设置目标交换器名称。String source, //设置源交换器名称&#xff0c;也就是消息先到该交换器&#xff0c;再路由到目标交换器。String routingKey, //绑定键&#xff0c;到时与消息的路由键进行匹配看是否路由到绑定的交换机或队列中去。Map<String, Object> arguments) throws IOException; //其他参数//异步对交换机绑定交换机&#xff0c;参数与上面一致。
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//删除一个交换器
Exchange.DeleteOk exchangeDelete(String exchange, //要删除的交换器名称boolean ifUnused //如果为true&#xff0c;则交换器没有被生产者使用时才可以删除&#xff0c;否则删除失败&#xff0c;另一个重载方法使用ifUnused &#61;false为默认值) throws IOException;//异步删除一个交换器&#xff0c;参数与上面方法一致。
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;//对交换器与交换器进行接触绑定
Exchange.UnbindOk exchangeUnbind(String destination, //目标交换器名称String source, //源交换器名称&#xff0c;在这边来解绑。String routingKey, //绑定键&#xff0c;destination、source、routingKey三者才能确定一个绑定关系。Map<String, Object> arguments //其他参数) throws IOException;//异步对交换器与交换器进行接触绑定&#xff0c;参数与上面一致
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//查看交换器是否存在&#xff0c;存在就正常返回&#xff0c;不存在就抛出异常
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

//队列相关操作

//队列相关操作
//声明创建一个队列。
Queue.DeclareOk queueDeclare(String queue, //要创建的队列名boolean durable, //是否持久化队列boolean exclusive, //是否排他队列&#xff0c;如果true&#xff0c;那么该队列只能在声明该队列的Connection下的Channel中使用&#xff0c;其他Connection创建的Channel不能使用&#xff0c;连接断开时队列自动删除boolean autoDelete,//是否自动删除Map<String, Object> arguments //其他参数) throws IOException;//异步声明创建一个队列
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;//交换器绑定队列
Queue.BindOk queueBind(String queue, //要绑定的队列名String exchange, //要绑定的交换器名String routingKey, //绑定键Map<String, Object> arguments //其他参数) throws IOException;//异步进行交换器绑定队列
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;//删除队列
Queue.DeleteOk queueDelete(String queue, //要删除的队列名boolean ifUnused, //如果为true&#xff0c;则只有队列没有被消费者使用时才删除成功&#xff0c;如果为false&#xff0c;就不管有没有&#xff0c;直接删除。boolean ifEmpty //如果为true&#xff0c;则只有队列为空队列时才能删除成功) throws IOException;//异步删除队列
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;//接触交换器与队列的绑定
Queue.UnbindOk queueUnbind(String queue, //要解绑的队列名String exchange, //要解绑的交换器名String routingKey, //绑定键 三者才能唯一确定一个绑定Map<String, Object> arguments //其他参数) throws IOException;//清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;//查看队列是否存在&#xff0c;存在就返回一个DeclareOk &#xff0c;不存在或者该队列时排他队列并且创建者不是这个连接&#xff0c;就抛出异常。Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

Message属性参数配置&#xff1a;

//原生API的消息属性参数配置主要由AMQP.BasicProperties类配置Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");AMQP.BasicProperties properties &#61; new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化.contentEncoding("UTF-8") // 编码.expiration("10000") // TTL&#xff0c;过期时间.headers(headers) // 自定义属性.priority(5) // 优先级&#xff0c;默认为5&#xff0c;配合队列的 x-max-priority 属性使用.messageId(String.valueOf(UUID.randomUUID())) //消息的Id.appId("order-service") //消息推送的应用程序.contentType("application/json") //消息的内容类型.replyTo("replyToQueue") //RPC模式下的回调.timestamp(new Date()) //消息发送的时间.userId("15355153") //一般生产者的id.build();//然后在消息发布时&#xff0c;指定properties
channel.basicPublish("", QUEUE_NAME, properties, "aaaa".getBytes());

这里暂时就讲那么多&#xff0c;关于消息的生产消费有很多细节&#xff0c;比如事务&#xff0c;消息确认、回调等等等等。另外开个新章节专门讲这些问题。

下面附上一个普通的消息收发例子&#xff1a;

public class MyProducer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 连接端口factory.setPort(5672);// 用户factory.setUsername("guest");factory.setPassword("guest");//设置这个uri可以代替上面的设置值&#xff0c;协议://用户名:密码&#64;ip:port//factory.setUri("amqp://guest:guest&#64;192.168.18.140:5672");// 虚拟机factory.setVirtualHost("/");//设置连接超时时间factory.setConnectionTimeout(10000);// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel(1000);// 要发送消息String msg &#61; "Hello world, Rabbit MQ";// String exchange 要发送到的交换器名称, String routingKey 消息的路由键, BasicProperties props 消息的额外属性, byte[] body 消息的具体内容channel.basicPublish(EXCHANGE_NAME, "testdemo", null, msg.getBytes());channel.close();conn.close();}
}public class MyConsumer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";private final static String QUEUE_NAME &#61; "SIMPLE_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 默认监听端口factory.setPort(5672);// 虚拟机factory.setVirtualHost("/");// 设置访问的用户factory.setUsername("guest");factory.setPassword("guest");// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel();// 声明交换机// String exchange 交换机名称, String type, boolean durable, boolean autoDelete, Map argumentsAMQP.Exchange.DeclareOk direct &#61; channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);// 声明队列// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" Waiting for message....");// 绑定队列和交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"testdemo");// 创建消费者Consumer consumer &#61; new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg &#61; new String(body, "UTF-8");System.out.println("Received message : &#39;" &#43; msg &#43; "&#39;");System.out.println("consumerTag : " &#43; consumerTag );System.out.println("deliveryTag : " &#43; envelope.getDeliveryTag() );}};// 开始获取消息// String queue, boolean autoAck, Consumer callbackchannel.basicConsume(QUEUE_NAME, true, consumer);}
}

如果第一次&#xff0c;服务器并没有队列和交换器&#xff0c;要先启动消费者再启动生产者&#xff0c;因为队列和交换器的声明都在消费者中完成&#xff0c;如果有了队列和交换器&#xff0c;哪个先启动都没关系。


推荐阅读
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 如何自行分析定位SAP BSP错误
    The“BSPtag”Imentionedintheblogtitlemeansforexamplethetagchtmlb:configCelleratorbelowwhichi ... [详细]
  • 本文介绍了闭包的定义和运转机制,重点解释了闭包如何能够接触外部函数的作用域中的变量。通过词法作用域的查找规则,闭包可以访问外部函数的作用域。同时还提到了闭包的作用和影响。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 目录实现效果:实现环境实现方法一:基本思路主要代码JavaScript代码总结方法二主要代码总结方法三基本思路主要代码JavaScriptHTML总结实 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
author-avatar
山海
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有