实际生产中一般是使用springboot封装的Template等组件来操作Rabbitmq的,但是如果了解了原生的API的使用,可以更好的理解Rabbitmq的特性和对springboot封装的组件的使用原理有一定了解。
从字面上看是连接工厂的意思,实际上该对象就是一个连接工厂对象,主要用于创建一个连接到Rabbitmq服务器的tcp连接。
基本的方法:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //设置连接的服务器的ip
factory.setPort(5672); //设置端口,如果使用AMQP协议进行通信的话,这里就写该协议的通信端口。
factory.setUsername("guest"); //用户名
factory.setPassword("guest"); //密码
//设置这个uri可以代替上面的设置值,协议://用户名:密码@ip:port
//factory.setUri("amqp://guest:guest@192.168.18.140:5672");
factory.setVirtualHost("/"); //设置连接的虚拟主机
//设置连接超时时间
factory.setConnectionTimeout(10000);Connection conn = factory.newConnection(); //创建一个TCP连接。
一个与rabbitmq服务器的TCP连接对象,因为Rabbitmq为了节省连接频繁创建和销毁的资源消耗,使用了channel来进行对Rabbitmq服务器的操作。所以Connection对象不会进行对Rabbitmq服务器的操作,该操作要交给Channel对象,而Connection对象的主要目的是为了创建一个Channel对象。
Connection conn = factory.newConnection();
Channel channel = conn.createChannel(); //创建一个channel对象,用于操作服务器。
Channel channel = conn.createChannel(1); //创建一个channel对象,并手动指定一个通道号码。上面的方法是自动为我们分配号码。范围1-2047
Optional<Channel> channel &#61; conn.openChannel(); //创建一个channel对象&#xff0c;把他封装在Optional对象中。
Optional<Channel> channel &#61; conn.openChannel(1);//创建一个channel对象&#xff0c;把他封装在Optional对象中。并手动指定通道号码。范围1-2047conn.close(); //关闭连接
对Rabbitmq服务器的操作基本都在这里了。
//交换器相关操作
//对交换器的操作&#xff0c;挑出来的都是重载方法中最全参数的重载方法&#xff0c;其他都是这个方法的删减用于使用一些默认值。
//声明一个交换器&#xff0c;这个是重载方法中参数最全的方法了&#xff0c;其他重载方法是对该方法的一些删减以使用某个默认值。DeclareOk 返回值主要用于查看一些创建信息。如果声明失败返回null。
//如果交换器已经存在&#xff0c;那么声明的参数一定要与已存在的交换器的参数一致&#xff0c;否则会报错。
Exchange.DeclareOk exchangeDeclare(String exchange, //要声明的交换器名称BuiltinExchangeType type, //要声明的交换器类型&#xff0c;BuiltinExchangeType 是一个枚举类&#xff0c;对交换器四种类型进行一个枚举。boolean durable, //是否持久化队列boolean autoDelete, //是否自动删除&#xff0c;具体含义可以看博主的另一篇文章有讲解boolean internal, //是否内部交换器Map<String, Object> arguments) throws IOException; //其他属性参数。//异步创建交换器&#xff0c;上面的是同步等待创建成功&#xff0c;这个异步&#xff0c;参数与上面一致。
void exchangeDeclareNoWait(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments) throws IOException;//对交换机进行绑定交换器
Exchange.BindOk exchangeBind(String destination, //设置目标交换器名称。String source, //设置源交换器名称&#xff0c;也就是消息先到该交换器&#xff0c;再路由到目标交换器。String routingKey, //绑定键&#xff0c;到时与消息的路由键进行匹配看是否路由到绑定的交换机或队列中去。Map<String, Object> arguments) throws IOException; //其他参数//异步对交换机绑定交换机&#xff0c;参数与上面一致。
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//删除一个交换器
Exchange.DeleteOk exchangeDelete(String exchange, //要删除的交换器名称boolean ifUnused //如果为true&#xff0c;则交换器没有被生产者使用时才可以删除&#xff0c;否则删除失败&#xff0c;另一个重载方法使用ifUnused &#61;false为默认值) throws IOException;//异步删除一个交换器&#xff0c;参数与上面方法一致。
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;//对交换器与交换器进行接触绑定
Exchange.UnbindOk exchangeUnbind(String destination, //目标交换器名称String source, //源交换器名称&#xff0c;在这边来解绑。String routingKey, //绑定键&#xff0c;destination、source、routingKey三者才能确定一个绑定关系。Map<String, Object> arguments //其他参数) throws IOException;//异步对交换器与交换器进行接触绑定&#xff0c;参数与上面一致
void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;//查看交换器是否存在&#xff0c;存在就正常返回&#xff0c;不存在就抛出异常
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
//队列相关操作
//队列相关操作
//声明创建一个队列。
Queue.DeclareOk queueDeclare(String queue, //要创建的队列名boolean durable, //是否持久化队列boolean exclusive, //是否排他队列&#xff0c;如果true&#xff0c;那么该队列只能在声明该队列的Connection下的Channel中使用&#xff0c;其他Connection创建的Channel不能使用&#xff0c;连接断开时队列自动删除boolean autoDelete,//是否自动删除Map<String, Object> arguments //其他参数) throws IOException;//异步声明创建一个队列
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;//交换器绑定队列
Queue.BindOk queueBind(String queue, //要绑定的队列名String exchange, //要绑定的交换器名String routingKey, //绑定键Map<String, Object> arguments //其他参数) throws IOException;//异步进行交换器绑定队列
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;//删除队列
Queue.DeleteOk queueDelete(String queue, //要删除的队列名boolean ifUnused, //如果为true&#xff0c;则只有队列没有被消费者使用时才删除成功&#xff0c;如果为false&#xff0c;就不管有没有&#xff0c;直接删除。boolean ifEmpty //如果为true&#xff0c;则只有队列为空队列时才能删除成功) throws IOException;//异步删除队列
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;//接触交换器与队列的绑定
Queue.UnbindOk queueUnbind(String queue, //要解绑的队列名String exchange, //要解绑的交换器名String routingKey, //绑定键 三者才能唯一确定一个绑定Map<String, Object> arguments //其他参数) throws IOException;//清空队列
Queue.PurgeOk queuePurge(String queue) throws IOException;//查看队列是否存在&#xff0c;存在就返回一个DeclareOk &#xff0c;不存在或者该队列时排他队列并且创建者不是这个连接&#xff0c;就抛出异常。Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
Message属性参数配置&#xff1a;
//原生API的消息属性参数配置主要由AMQP.BasicProperties类配置Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");Map<String, Object> headers &#61; new HashMap<String, Object>();headers.put("name", "gupao");headers.put("level", "top");AMQP.BasicProperties properties &#61; new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化.contentEncoding("UTF-8") // 编码.expiration("10000") // TTL&#xff0c;过期时间.headers(headers) // 自定义属性.priority(5) // 优先级&#xff0c;默认为5&#xff0c;配合队列的 x-max-priority 属性使用.messageId(String.valueOf(UUID.randomUUID())) //消息的Id.appId("order-service") //消息推送的应用程序.contentType("application/json") //消息的内容类型.replyTo("replyToQueue") //RPC模式下的回调.timestamp(new Date()) //消息发送的时间.userId("15355153") //一般生产者的id.build();//然后在消息发布时&#xff0c;指定properties
channel.basicPublish("", QUEUE_NAME, properties, "aaaa".getBytes());
这里暂时就讲那么多&#xff0c;关于消息的生产消费有很多细节&#xff0c;比如事务&#xff0c;消息确认、回调等等等等。另外开个新章节专门讲这些问题。
下面附上一个普通的消息收发例子&#xff1a;
public class MyProducer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 连接端口factory.setPort(5672);// 用户factory.setUsername("guest");factory.setPassword("guest");//设置这个uri可以代替上面的设置值&#xff0c;协议://用户名:密码&#64;ip:port//factory.setUri("amqp://guest:guest&#64;192.168.18.140:5672");// 虚拟机factory.setVirtualHost("/");//设置连接超时时间factory.setConnectionTimeout(10000);// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel(1000);// 要发送消息String msg &#61; "Hello world, Rabbit MQ";// String exchange 要发送到的交换器名称, String routingKey 消息的路由键, BasicProperties props 消息的额外属性, byte[] body 消息的具体内容channel.basicPublish(EXCHANGE_NAME, "testdemo", null, msg.getBytes());channel.close();conn.close();}
}public class MyConsumer {private final static String EXCHANGE_NAME &#61; "SIMPLE_EXCHANGE";private final static String QUEUE_NAME &#61; "SIMPLE_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory &#61; new ConnectionFactory();// 连接IPfactory.setHost("192.168.18.140");// 默认监听端口factory.setPort(5672);// 虚拟机factory.setVirtualHost("/");// 设置访问的用户factory.setUsername("guest");factory.setPassword("guest");// 建立连接Connection conn &#61; factory.newConnection();// 创建消息通道Channel channel &#61; conn.createChannel();// 声明交换机// String exchange 交换机名称, String type, boolean durable, boolean autoDelete, Map
}
如果第一次&#xff0c;服务器并没有队列和交换器&#xff0c;要先启动消费者再启动生产者&#xff0c;因为队列和交换器的声明都在消费者中完成&#xff0c;如果有了队列和交换器&#xff0c;哪个先启动都没关系。