热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

消息队列(五)RocketMQ消息存储1

问题;部署时如何知道自己是broker还是NameServertopic订阅信息放在哪里broker的作用到底是什么纪录是如何持久化的群发的时候,是如何储存消息的send方法到底有

问题 ;

  • 部署时如何知道自己是 broker 还是 NameServer
  • topic 订阅信息放在哪里
  • broker 的作用到底是什么
  • 纪录是如何持久化的
  • 群发的时候,是如何储存消息的
  • send 方法到底有没有使用多线程发送处理,简单叙述一个 send 的过程
  • transientStorePool 作用到底是什么
    以上几个问题,是在学习中思考的问题,我们将在学习过程中逐渐寻找答案。

概述

本文将会走完文件存储的主体流程,后续分析各个重要的类。

源码分析

消息持久化几个重要的类 :

  • DefaultMessageStore
  • commitLog
  • MappedFile

DefaultMessageStore 持有 commitLog ,commitLog 持有 MappedFileQueue ,MappedFileQueue 管理着多个 MappedFile.

首先我们从 DefaultMessageStore 的 putMessage 方法

    /**
     * 1. 判断是否条件满足条件
     * 2. 调用 this.commitLog.putMessage(msg);
     *
     * @param msg Message instance to store
     * @return
     */
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        // 该 broker 是 slave 模式
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }

        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }

            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }

        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }

        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }

        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }


        long beginTime = this.getSystemClock().now();
        //写入逻辑
        PutMessageResult result = this.commitLog.putMessage(msg);

        long eclipseTime = this.getSystemClock().now() - beginTime;
        if (eclipseTime > 500) {
            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }

        return result;
    }

CommitLog 的 putMessage 方法

    /**
     * @param msg
     * @return
     */
    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 延迟发送,将topic 换成 SCHEDULE_TOPIC , queueId 换成 延迟时间级别
            // 这是并发消息消费重试关键的一步,下一章会重点探讨消息重试机制与定时消息的实现原理。
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config  自旋或是重入锁看存储配置
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            //核心逻辑!!可知 mappedFile是最终写入文件中的实施者
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        handleDiskFlush(result, putMessageResult, msg);
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

总结

我们可以看到 RocketMQ 最主要就是利用映射文件的方法来提高读写效率。

参考资料

  • https://blog.csdn.net/qq496013218/article/details/69397380
  • 《Rocket源码分析》

消息队列(五)--- RocketMQ-消息存储1


推荐阅读
  • 本文分析和介绍了GLo ... [详细]
  • Git(1)
    安装Git完毕(在开始菜单打开的话,打开的不是你想要的路径,切换路径很麻烦)1.D盘新建GitTest文件夹2.打开GitTest,在空白的地方右键,3.单击GitBashHere ... [详细]
  • python基础(二、pycharm安装、卸载)
    3.在Ubuntu中安装PyCharmPyCharm的官方网站地址是:https:www.jetbrains.compycharm注意:安装时不要使用root用户安装,否则后期使用 ... [详细]
  • MyBatis模糊查询和多条件查询一、ISmbmsUserDao层根据姓名模糊查询publicListgetUser();多条件查询publicList ... [详细]
  • 抓取百万知乎用户设计之实体设计
    一.实体的关系实体是根据返回的Json数据来设计的教育经历方面用户可以有很多教育经理,USER和education是一对多的关系,一个education对应一个education一 ... [详细]
  • Xib九宫格应用管理使用xib封装一个自定义view的步骤1新建一个继承UIView的自定义view,假设类名叫做(AppView)2新建一个AppView.xib文件来描述 ... [详细]
  • 【自制小工具】代码生成器
    【自制小工具】代码生成器陆陆续续接触过好几款代码生成工具,发现确实好用,但都会有那么点不完善的地方,所以索性就自己做一个吧。界面非常简单,反正是自己用的,简单点用起来也方便上图:左 ... [详细]
  • kepserver中文手册,kepserver使用教程,kepserver设置
    下面介绍一下KepServer模拟器的使用,以下示例使用服务器随附的Simulator驱动程序来演示创建、配置和运行项目的过程。Simulator驱动程序是基于内存的驱动程序,能为 ... [详细]
  • 看这里,教你如何快速将pdf文件翻译成中文
    因为网上下载的PDF资料,往往掺杂着一些英文,所以中英文翻译是一件很平常的事,毕竟不是每个人的英文都那么好,轻轻松松的就能够看完一篇英文的文件,那么,我们就要寻找翻译工具来帮助我们 ... [详细]
  • 作业迁移
    背景:数据库服务器更换,1、数据库迁移(BACKUPRESTORE);2、数据库登录名用户迁移(注意孤立用户);3、作业迁移数据库迁移,备份数据库、拷贝备份文件到新服务器,还原数据 ... [详细]
  • packagetest;importjava.io.FileInputStream;importjava.io.FileOutputStream;importjava.io.IOE ... [详细]
  • vector:在vc6中,如果要镶嵌使用vector,如vector,后面的两个应该用,空格隔开,否则被编译器认为是移位符string::npos的值为 ... [详细]
  • 猫猫分享,必须精品原文地址:http:blog.csdn.netu013357243articledetails44571163素材地址:http:download.csdn.n ... [详细]
  • 定义:定义两个数论函数\(f\)、\(g\)的Dirichlet卷积为:\[\left(f*g\right)\left(n\right)\sum_{d|n}f\left(d\rig ... [详细]
  • Spark 贝叶斯分类算法
    一、贝叶斯定理数学基础我们都知道条件概率的数学公式形式为即B发生的条件下A发生的概率等于A和B同时发生的概率除以B发生的概率。根据此公式变换,得到贝叶斯公式:即贝叶斯定律是关于随机 ... [详细]
author-avatar
没有变成王子的青蛙
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有