热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQJAVA客户端调用

1.安装erlang下载地址:http:www.erlang.orgdownloads设置ERLANG环境变量2.安装RabbitMQ下载地址: http:www.rabbitmq

1.安装erlang

  下载地址:http://www.erlang.org/downloads

  设置ERLANG环境变量

2.安装RabbitMQ

  下载地址: http://www.rabbitmq.com/download.html

RabbitMQ JAVA客户端调用

 

输入命令安装各种管理插件:

D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin>rabbitmq-plugins enable rabbitmq_management

RabbitMQ JAVA客户端调用

重启服务

net stop rabbitmq && net start rabbitmq

登录

http://127.0.0.1:15672 默认用户名密码 guest  guest

常用命令(RabbitMQ命令在sbin目录下D:\RabbitMQServer\rabbitmq_server-3.7.10\sbin,记得设置环境变量)

rabbitmqctl delete_vhost test_vhosts 删除虚拟机test_vhosts 

3. RabbitMQ知识整理

来自(https://blog.csdn.net/dreamchasering/article/details/77653512)

什么是MQ?

      MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

      RabbitMQ是MQ的一种。下面详细介绍一下RabbitMQ的基本概念。

      1、队列、生产者、消费者

      队列是RabbitMQ的内部对象,用于存储消息。生产者(下图中的P)生产消息并投递到队列中,消费者(下图中的C)可以从队列中获取消息并消费。

      RabbitMQ JAVA客户端调用

      多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

      RabbitMQ JAVA客户端调用

2、Exchange、Binding

      刚才我们看到生产者将消息投递到队列中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器,下图中的X),再通过Binding将Exchange与Queue关联起来。

      RabbitMQ JAVA客户端调用

3、Exchange Type、Bingding key、routing key

      在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。

      生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。

      RabbitMQ常用的Exchange Type有三种:fanout、direct、topic。

      fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中。

      direct:把消息投递到那些binding key与routing key完全匹配的队列中。

      topic:将消息路由到binding key与routing key模式匹配的队列中。

      附上一张RabbitMQ的结构图:

      RabbitMQ JAVA客户端调用

    

最后来具体解析一下几个问题:

1、可以自动创建队列,也可以手动创建队列,如果自动创建队列,那么是谁负责创建队列呢?是生产者?还是消费者? 

      如果队列不存在,当然消费者不会收到任何的消息。但是如果队列不存在,那么生产者发送的消息就会丢失。所以,为了数据不丢失,消费者和生产者都可以创建队列。那么如果创建一个已经存在的队列呢?那么不会有任何的影响。需要注意的是没有任何的影响,也就是说第二次创建如果参数和第一次不一样,那么该操作虽然成功,但是队列属性并不会改变。

      队列对于负载均衡的处理是完美的。对于多个消费者来说,RabbitMQ使用轮询的方式均衡的发送给不同的消费者。

2、RabbitMQ的消息确认机制

      默认情况下,如果消息已经被某个消费者正确的接收到了,那么该消息就会被从队列中移除。当然也可以让同一个消息发送到很多的消费者。

      如果一个队列没有消费者,那么,如果这个队列有数据到达,那么这个数据会被缓存,不会被丢弃。当有消费者时,这个数据会被立即发送到这个消费者,这个数据被消费者正确收到时,这个数据就被从队列中删除。

     那么什么是正确收到呢?通过ack。每个消息都要被acknowledged(确认,ack)。我们可以显示的在程序中去ack,也可以自动的ack。如果有数据没有被ack,那么:

     RabbitMQ Server会把这个信息发送到下一个消费者。

     如果这个app有bug,忘记了ack,那么RabbitMQServer不会再发送数据给它,因为Server认为这个消费者处理能力有限。

    而且ack的机制可以起到限流的作用(Benefitto throttling):在消费者处理完成数据后发送ack,甚至在额外的延时后发送ack,将有效的均衡消费者的负载。

4.JAVA demo

引入RabbitMQ客户端

<dependency>
    <groupId>com.rabbitmqgroupId>
    <artifactId>amqp-clientartifactId>
    <version>3.6.5version>
dependency>

3.1 使用默认配置直接发送消息到队列

生产者

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 
 * 默认发送,直接将消息发送到某个队列,默认交换机type为direct
 * 
 * @author
 * @date 2019/01/10 11:17:10
 */
public class ProducterDirectDemo {
    public static void main(String[] args) throws IOException, TimeoutException {

        String queneName = "testQuene";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            // 创建与RabbitMQ服务器的TCP连接
            cOnnection= factory.newConnection();
            // 创建一个频道
            channel = connection.createChannel();
            // 声明默认的队列
            channel.queueDeclare(queneName, true, false, true, null);
            while (true) {
                channel.basicPublish("", queneName, null, UUID.randomUUID().toString().getBytes());
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

消费者

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 接收默认消息
 * 
 * @author
 * @date 2019/01/10 11:14:32
 */
public class ConsumerDirectDemo {
    public static void main(String[] args) {
        String queneName = "testQuene";
        Connection connection = null;
        Channel channel = null;
        try {

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                }
            };
            // channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queneName, true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

3.2 设置交换器,队列,路由发送消息

生产者

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 指定交换机,队列,路由key方式
 * 
 * @author
 * @date 2019/01/10 11:19:38
 */
public class ProducterAllDemo {
    public static void main(String[] args) throws IOException, TimeoutException {

        String queneName = "firstQueue";
        String exchangeName = "amq.fanout";
        String routingKey = "test1";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");

            // 创建与RabbitMQ服务器的TCP连接
            cOnnection= factory.newConnection();
            // 创建一个频道
            channel = connection.createChannel();
            // 声明交换机类型
            channel.exchangeDeclare("amq.fanout", "fanout", true);
            // 声明默认的队列 (也可不申明队列,使用默认队列)
            channel.queueDeclare(queneName, true, false, true, null);
            // String queue = channel.queueDeclare().getQueue();
            // 将队列与交换机绑定
            channel.queueBind(queneName, exchangeName, routingKey);
            // 指定一个队列
            // channel.queueDeclare(queneName, false, false, false, null);
            while (true) {
                channel.basicPublish(exchangeName, routingKey, null, UUID.randomUUID().toString().getBytes());
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        }

    }
}

消费者

import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 
 * @author
 * @date 2019/01/10 11:19:42
 */
public class ConsumerAllDemo {
    public static void main(String[] args) {
        String queneName = "firstQueue";
        String exchangeName = "amq.fanout";
        String routingKey = "test1";
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            factory.setVirtualHost("test_vhosts");
            connection = factory.newConnection();
            channel = connection.createChannel();

            // 声明交换机类型
            channel.exchangeDeclare(exchangeName, "fanout", true);
            // 声明默认的队列(也可不申明队列,使用默认队列)
            channel.queueDeclare(queneName, true, false, true, null);
            // String queue = channel.queueDeclare().getQueue();
            // 将队列与交换机绑定
            channel.queueBind(queneName, exchangeName, routingKey);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(envelope.getExchange() + "," + envelope.getRoutingKey() + "," + message);
                }
            };
            // channel绑定队列、消费者,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queneName, true, consumer);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

 


推荐阅读
  • 深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用
    深入解析Spring Boot启动过程中Netty异步架构的工作原理与应用 ... [详细]
  • RocketMQ在秒杀时的应用
    目录一、RocketMQ是什么二、broker和nameserver2.1Broker2.2NameServer三、MQ在秒杀场景下的应用3.1利用MQ进行异步操作3. ... [详细]
  • 在 CentOS 7 上部署和配置 RabbitMQ 消息队列系统时,首先需要安装 Erlang,因为 RabbitMQ 是基于 Erlang 语言开发的。具体步骤包括:安装必要的依赖项,下载 Erlang 源码包(可能需要一些时间,请耐心等待),解压源码包,解决可能出现的错误,验证安装是否成功,并将 Erlang 添加到环境变量中。接下来,下载 RabbitMQ 的 tar.xz 压缩包,并进行解压和安装。确保每一步都按顺序执行,以保证系统的稳定性和可靠性。 ... [详细]
  • JUC(三):深入解析AQS
    本文详细介绍了Java并发工具包中的核心类AQS(AbstractQueuedSynchronizer),包括其基本概念、数据结构、源码分析及核心方法的实现。 ... [详细]
  • 双指针法在链表问题中应用广泛,能够高效解决多种经典问题,如合并两个有序链表、合并多个有序链表、查找倒数第k个节点等。本文将详细介绍这些应用场景及其解决方案。 ... [详细]
  • 探讨Redis的最佳应用场景
    本文将深入探讨Redis在不同场景下的最佳应用,包括其优势和适用范围。 ... [详细]
  • Unity与MySQL连接过程中出现的新挑战及解决方案探析 ... [详细]
  • 如何在PHP中准确获取服务器IP地址?
    如何在PHP中准确获取服务器IP地址? ... [详细]
  • 本文深入解析了通过JDBC实现ActiveMQ消息持久化的机制。JDBC能够将消息可靠地存储在多种关系型数据库中,如MySQL、SQL Server、Oracle和DB2等。采用JDBC持久化方式时,数据库会自动生成三个关键表:`activemq_msgs`、`activemq_lock`和`activemq_ACKS`,分别用于存储消息数据、锁定信息和确认状态。这种机制不仅提高了消息的可靠性,还增强了系统的可扩展性和容错能力。 ... [详细]
  • C++ 异步编程中获取线程执行结果的方法与技巧及其在前端开发中的应用探讨
    本文探讨了C++异步编程中获取线程执行结果的方法与技巧,并深入分析了这些技术在前端开发中的应用。通过对比不同的异步编程模型,本文详细介绍了如何高效地处理多线程任务,确保程序的稳定性和性能。同时,文章还结合实际案例,展示了这些方法在前端异步编程中的具体实现和优化策略。 ... [详细]
  • 如何利用Java 5 Executor框架高效构建和管理线程池
    Java 5 引入了 Executor 框架,为开发人员提供了一种高效管理和构建线程池的方法。该框架通过将任务提交与任务执行分离,简化了多线程编程的复杂性。利用 Executor 框架,开发人员可以更灵活地控制线程的创建、分配和管理,从而提高服务器端应用的性能和响应能力。此外,该框架还提供了多种线程池实现,如固定线程池、缓存线程池和单线程池,以适应不同的应用场景和需求。 ... [详细]
  • 小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限
    小王详解:内部网络中最易理解的NAT原理剖析,挑战你的认知极限 ... [详细]
  • 作为140字符的开创者,Twitter看似简单却异常复杂。其简洁之处在于仅用140个字符就能实现信息的高效传播,甚至在多次全球性事件中超越传统媒体的速度。然而,为了支持2亿用户的高效使用,其背后的技术架构和系统设计则极为复杂,涉及高并发处理、数据存储和实时传输等多个技术挑战。 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 一文了解消息中间件RabbitMQ
    消息中间件---RabbitMQ1消息中间件的作用2.常用的消息中间件3消息中间件RabbitMQ3.1RabbitMQ介绍3.3RabbitMQ的队列模式3.3RabbitMQ的 ... [详细]
author-avatar
喜欢上爱的感觉了
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有