热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ-tutorial1:hellorabbitmq

RabbitMQRabbitMQ是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局,RabbitMQ是邮箱,邮局和邮递员。RabbitMQ

Rabbit MQ

RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局,RabbitMQ是邮箱,邮局和邮递员。

  • Rabbit MQ
    • Tutorial 1 - Hello Rabbit MQ
      • Rabbit MQ安装
      • Rabbit MQ Service运行
      • 实现Publisher(Publisher往Broker发送消息)
      • 实现Consumer(Consumer从Broker接收消息)
      • 运行结果截图

几个重要概念

  • 生产者(Producer):消息发送者即为生产者
  • 消息队列(Queue):本质上是一个消息缓冲区,可以理解成邮箱,容量受限于主机内存和磁盘;多个生产者可以往同一个消息队列发消息,多个消费者可以从同一个消息队列接收消息
  • 消费者(Consumer):消息接收者即为消费者
  • 代理(Broker):中间人,它接受消息,转发消息

通常情况下,Producer,Consumer,Broker它们都不在同一台主机上。

Tutorial 1 - Hello Rabbit MQ

本例使用:rabbitmq_server-3.7.4, Intellij Idea; 并且Producer,Broker,Consumer都在统一电脑上。
(1)Broker:直接使用本机,Rabbit MQ Server安装(见下面安装部分)完成后,直接从开始菜单“RabbitMQ Service - start”启动服务。(2)实现Publisher,通过一个线程,每隔5s发送一条消息到Broker。(3)实现Consumer,从Broker接收消息。

Rabbit MQ安装

下载并安装Erlang
下载安装Rabbit MQ
注意它们俩之间的版本对应关系。

Rabbit MQ Service运行

安装完成之后,在开始菜单上有常用命令的快捷方式:
RabbitMQ Service - start
RabbitMQ Service - stop
RabbitMQ Command Prompt(sbin dir)

通过RabbitMQ Command Prompt(sbin dir)开启management plugin:rabbitmq-plugins enable rabbitmq_management
这里写图片描述

这样就可以通过http://localhost:15672使用RabbitMQ management
使用默认的user=guest,password=guest登录
这里写图片描述

想要了解RabbitMQ使用的相关端口,参见http://www.rabbitmq.com/install-windows.html页面。

通过 RabbitMQ Service - start启动服务,这就相当于把代理(broker)运行起来了,Producer就可以往它发送message,而Consumer也可以从它接收message。

实现Publisher(Publisher往Broker发送消息)

  • 运行Intellij Idea 创建一个maven的quickstart类型的项目
POM.xml依赖如下


<dependencies>
    <dependency>
      <groupId>com.rabbitmqgroupId>
      <artifactId>amqp-clientartifactId>
      <version>5.2.0version>
    dependency>
    <dependency>
      <groupId>org.slf4jgroupId>
      <artifactId>slf4j-apiartifactId>
      <version>1.7.25version>
    dependency>
    <dependency>
      <groupId>org.slf4jgroupId>
      <artifactId>slf4j-log4j12artifactId>
      <version>1.7.25version>
    dependency>
  dependencies>
  • 新加一个Send类和一个线程实现类用于发送消息
    代码实现如下,详细步骤见代码注释
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

//Publisher发送完消息就结束
//The publisher(MySendTask) will connect to RabbitMQ, 
// send message per 5 seconds, 
// until the thread stopped and then exit.
public class MySendTask implements Runnable {
    //define name of the queue
    private final static String QUEUE_NAME = "hello";
    //connection to the server(broker)
    private Connection rbtMqConn;
    //
    private Channel rbtMqChnl;

    private boolean isStop = false;

    public void setIsStop(boolean stop){
        this.isStop = stop;
    }

    @Override
    public void run() {
        try{
            //1.create a connection to the server
            //The connection abstracts the socket connection,
            //and takes care of protocol version negotiation 
            //and authentication and so on for us.
            ConnectionFactory factory = new ConnectionFactory();
            //Here we connect to a broker on the 
            //local machine - hence the localhost.
            //consumer也从这个broker接收消息,也可以使用其它主机,比如172.16.21.10
            factory.setHost("localhost");
            rbtMqCOnn= factory.newConnection();

            //2.we create a channel, which is where most of the 
            //API for getting things done resides.
            rbtMqChnl = rbtMqConn.createChannel();

            //3.To send, we must declare a queue for us to send to; 
            // then we can publish a message to the queue
            //Consumer也需要指定使用该名字的channel
            rbtMqChnl.queueDeclare(QUEUE_NAME, false, false, false, null);

            //message to send
            String message = "Hello Rabbit MQ!";

            //send message per 5s
            while (!isStop){
                rbtMqChnl.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
                Thread.sleep(5000);
            }

            //4.Lastly, we close the channel and the connection;
            rbtMqChnl.close();
            rbtMqConn.close();
        }catch(Exception ex){
            System.out.println(ex.getMessage());
        }
        System.out.println(" Send task stop");
    }
}

//message publisher
public class Send {
    public static void main(String[] argv) throws Exception {
        MySendTask sendTask = new MySendTask();
        Thread thread = new Thread(sendTask);
        thread.start();
        //let the thread run 60 seconds
        Thread.sleep(60000);
        sendTask.setIsStop(true);
    }
}

实现Consumer(Consumer从Broker接收消息)

和Publisher类似,Consumer也要打开链接和通道(channel),并且要和Publisher匹配。步骤见如下代码。

import com.rabbitmq.client.*;
import java.io.IOException;

//consumer和publisher不一样,它一直运行,监听接收消息
//Our consumer is pushed messages from RabbitMQ, 
// so unlike the publisher which publishes some messages and stop, 
// we'll keep it running to listen for messages and print them out.
public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //we open a connection and a channel,
        // and declare the queue from which we're going to consume.
        // Note this matches up with the queue that send publishes to.
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection cOnnection= factory.newConnection();
        Channel channel = connection.createChannel();

        //Note that we declare the queue here, as well.
        // Because we might start the consumer before the publisher,
        // we want to make sure the queue exists before we try to consume messages from it.
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //Since it will push us messages asynchronously,
        // we provide a callback in the form of an object
        // that will buffer the messages until we're ready to use them.
        // That is what a DefaultConsumer subclass does.
        Consumer cOnsumer= new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

运行结果截图

这里写图片描述


推荐阅读
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 基于Axis、XFire、CXF的webservice客户端调用示例
    本文介绍了如何使用Axis、XFire、CXF等工具来实现webservice客户端的调用,以及提供了使用Java代码进行调用的示例。示例代码中设置了服务接口类、地址,并调用了sayHello方法。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 本文介绍了解决二叉树层序创建问题的方法。通过使用队列结构体和二叉树结构体,实现了入队和出队操作,并提供了判断队列是否为空的函数。详细介绍了解决该问题的步骤和流程。 ... [详细]
  • Tomcat/Jetty为何选择扩展线程池而不是使用JDK原生线程池?
    本文探讨了Tomcat和Jetty选择扩展线程池而不是使用JDK原生线程池的原因。通过比较IO密集型任务和CPU密集型任务的特点,解释了为何Tomcat和Jetty需要扩展线程池来提高并发度和任务处理速度。同时,介绍了JDK原生线程池的工作流程。 ... [详细]
  • 栈和队列的共同处和不同处
    本文主要介绍了栈和队列的共同处和不同处。栈和队列都是由几个数据特性相同的元素组成的有限序列,也就是线性表。队列是限定仅在表的一端插入元素、在另一端删除元素的线性表,遵循先进先出的原则。栈是限定仅在表尾进行插入或删除操作的线性表,遵循后进先出的原则。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 本文介绍了栈和队列的区别及其特点。栈是一种先进后出的线性表,只能在表的一端进行插入和删除操作;队列是一种先进先出的线性表,只能在表的一端进行插入和在另一端进行删除操作。栈和队列是两种广泛使用的线性数据结构,它们的基本操作具有特殊性。栈的遍历需要遍历整个栈才能取出数据,并需要为数据开辟临时空间,而队列基于地址指针进行遍历,可以从头或尾部开始遍历,但不能同时遍历,且无需开辟临时空间。栈和队列在程序设计中具有重要应用。 ... [详细]
  • STL迭代器的种类及其功能介绍
    本文介绍了标准模板库(STL)定义的五种迭代器的种类和功能。通过图表展示了这几种迭代器之间的关系,并详细描述了各个迭代器的功能和使用方法。其中,输入迭代器用于从容器中读取元素,输出迭代器用于向容器中写入元素,正向迭代器是输入迭代器和输出迭代器的组合。本文的目的是帮助读者更好地理解STL迭代器的使用方法和特点。 ... [详细]
author-avatar
-独享你的温情
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有