热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

javaweb消息中间件——rabbitmq入门

概念:RabbitMQ是一款开源的消息中间件系统,由erlang开发,是AMQP的实现。架构图大概如上。broker是消息队列的服务器,比如在linux上,我们安装的rabbitmq就是一

概念:RabbitMQ是一款开源的消息中间件系统,由erlang开发,是AMQP的实现。

架构图大概如上。

broker是消息队列的服务器,比如在linux上,我们安装的rabbitmq就是一个broker,可以通过url+username+password连接。

每个消息服务器可以创建多个vhost,默认的vhost是“/”,linux中通过rabbitmqctl add_vhost  创建vhost,再给指定用户授权即可。

生产者首先通过创建channel与broker连接,类似于创建一个会话,这样可以与消息主机通信发送消息。

消息生产者将消息发送到定义的exchange上,exchange通过不同的转发路由规则将消息转发到相应的队列,消费者选择一个队列监听,如果有多个消费者监听同一个队列,默认是轮询方式,保证每个连接有相同的收到消息的概率。

一个简单的rabbitmq程序:

public class Producer {
    private static final String TEST_VHOST = "testvhost";
    private static final String TEST_QUEUE_NAME = "task_queue";

    private static Connection connection;
    private static Channel channel;

    public static void main(String[] args) throws IOException, TimeoutException, RabbitmqConnectionException {
        try {
            //create connectionFactory with host, username, password, vhost.
            ConnectionFactory cOnnectionFactory= new ConnectionFactory();
            connectionFactory.setUsername("test");
            connectionFactory.setPassword("test");
            connectionFactory.setHost("localhost");
            connectionFactory.setVirtualHost(TEST_VHOST);
            //get connection from connectionFactory
            cOnnection= connectionFactory.newConnection();
            //create an session to communicate with mq host
            channel = connection.createChannel();
            //declare a queue(if not exists, create it)
            channel.queueDeclare(TEST_QUEUE_NAME, true, false, false, null);
            String message = "Hello world";
            System.out.println("sending message : " + message);
            //publish message to the declaring queue
            channel.basicPublish("", TEST_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        }catch (Exception e) {
            throw new RabbitmqConnectionException("Error connection");
        } finally {
            channel.close();
            connection.close();
        }


    }
}

  

public class Consumer {
    private static final String TEST_VHOST = "testvhost";
    private static final String TEST_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost(TEST_VHOST);
        factory.setUsername("test");
        factory.setPassword("test");
        final Connection cOnnection= factory.newConnection();
        final Channel channel = connection.createChannel();

        //declaring a queue to listen
        channel.queueDeclare(TEST_QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages...");

        //a piece message per time
        channel.basicQos(1);

        final com.rabbitmq.client.Consumer cOnsumer= new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println("Received : '" + message + "'");
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TEST_QUEUE_NAME, false, consumer);
    }
}

 在spring中:



       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:mvc="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    rabbitmq 连接服务配置

    package="com.battery.rabbitMq"/>

    
    

    

    
    

    
    class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

    
    

    
    
        
            
        
    

    

    
    class="com.battery.rabbitMq.QueueListener"/>

    
    
        
    

 
 
@Service
public class MQProducerImpl implements MQProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

private final static Logger LOGGER = LoggerFactory.getLogger(MQProducerImpl.class);

/**
* convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
* 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
**/
@Override
public void sendDataToQueue(Object object) {
try {
rabbitTemplate.convertAndSend(object);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
@Component
public class QueueListener implements ChannelAwareMessageListener {

    private static Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            String ackMessage = new String(message.getBody(), "utf-8");
            System.out.print(ackMessage);
            logger.debug("接收到:" + new String(message.getBody(), "utf-8"));
        } catch (Exception e) {
            System.out.print(e.getMessage());
        }
    }
}

 

 

 

  


推荐阅读
  • RabbitMq的最终一致性分布式事务
    RabbitMq的最终一致性分布式事务使用rabbitmq的步骤1.运行安装在服务器上的rabbit服务2.在项目中安装依赖3.编写对应的配置文件4.创建对应配置并加上启动注解5. ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • OpenStack 的 Nova 和 Glance 组件
    简单回顾一下OpenStack三大组件的用途:OpenStackCompute(Nova),为云组织的控制器,它提供一个工具来部署云&#x ... [详细]
  • RabbitMQ消息中间件快速入门:SpringBoot整合生产者与消费者
    前言本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢?我们的生产者发送一条消息,投递到RabbitMQ集群也就是Broker。 ... [详细]
  • 下图|通用型_企业用户如何选择合适的云服务器配置?
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了企业用户如何选择合适的云服务器配置?相关的知识,希望对你有一定的参考价值。随着网络飞速发展,企业上云已成为 ... [详细]
  • 篇首语:本文由编程笔记#小编为大家整理,主要介绍了架构文摘:消息队列设计精要相关的知识,希望对你有一定的参考价值。消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具 ... [详细]
  • springboot+rabbitmq路由模式
    路由模式是把队列通过rout绑定到交换机上首先是POMorg.springframework.boot ... [详细]
  • CentOs 7.3中搭建RabbitMQ 3.6单机多实例服务的步骤与使用
    CentOs7.3中搭建RabbitMQ3.6单机多实例服务的步骤与使用-RabbitMQ简介RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • [译]技术公司十年经验的职场生涯回顾
    本文是一位在技术公司工作十年的职场人士对自己职业生涯的总结回顾。她的职业规划与众不同,令人深思又有趣。其中涉及到的内容有机器学习、创新创业以及引用了女性主义者在TED演讲中的部分讲义。文章表达了对职业生涯的愿望和希望,认为人类有能力不断改善自己。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 一句话解决高并发的核心原则
    本文介绍了解决高并发的核心原则,即将用户访问请求尽量往前推,避免访问CDN、静态服务器、动态服务器、数据库和存储,从而实现高性能、高并发、高可扩展的网站架构。同时提到了Google的成功案例,以及适用于千万级别PV站和亿级PV网站的架构层次。 ... [详细]
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 模块化区块链生态系统的优势概述及其应用案例
    本文介绍了相较于单体区块链,模块化区块链生态系统的优势,并以Celestia、Dymension和Fuel等模块化区块链项目为例,探讨了它们解决可扩展性和部署问题的方案。模块化区块链架构提高了区块链的可扩展性和吞吐量,并提供了跨链互操作性和主权可扩展性。开发人员可以根据需要选择执行环境,并获得奖学金支持。该文对模块化区块链的应用案例进行了介绍,展示了其在区块链领域的潜力和前景。 ... [详细]
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
author-avatar
小池子的思密达
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有