正文

就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的故事就这么展开了。

这节我们研究研究,消息的发送流程。也就是说,消息孩子从进门到坐到message queue座位上都经历了啥。

父母把消息孩子送到码头之后,门口的门童defaultMQProducerImpl.send()接过孩子,进入到MQ房子内部,然后引导孩子进入Broker候船大厅内的message queue座位上就坐。这就是消息发送的流程了。

而且孩子在刚被门童接到之后,就被规定了能在候船大厅待多久,默认是3秒。也就是说,要是再小房子内等了三秒没走,就离开吧,你怕是没想明白自己来干啥的。这就是消息的超时时间。

读源码

1 调用defaultMQProducerImpl.send()

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

2 设置过期时间

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3 执行defaultMQProducerImpl.sendDefaultImpl()方法

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

这里看看这几个参数,

  • communicationMode 是通信模式,同步异步还是单向
  • sendCallback 是针对异步模式的,异步模式需要设置发送完成后的回调。

sendDefaultImpl是发送消息的核心方法。

这里消息孩子进到第一个卡口,先要检查送孩子来的家长是否还能联系上,若是能联系到,就继续。要是联系不到,这孩子岂不是被抛弃了,不敢接不敢接,送到孤儿院吧。

然后需要检查消息孩子了,首先是检查孩子还在不在,别扔个衣服跑了。
然后看看孩子指定的这个topic,不能说我想去内个topic哈,必须是实实在在的名字。而且上头也规定了,这个topic的名字也不能太长,也不能包含特殊字符。已有的一些领导定过的也不能用哈。
接下来就是检查孩子的body了,之前说body就是孩子的技能,首先,技能为空,不行不行,啥都不会是不行的。再者太长也不行,你唱首歌两年,这没法玩。

检查message不为null

检查topic

  • topic不能为空
  • topic不能太长
  • 不能包含特殊字符

检查话题的名字是否被系统已占用

检查body

  • 检查是否为空
  • 检查长度是否过长,最大为4MB 这样

下边我们看看sendDefaultImpl这个方法。给他拆成一段一段的看。

1 两个校验

//校验生产者服务是ok的,可以联系到的
this.makeSureStateOK();
//校验消息的参数
Validators.checkMessage(msg, this.defaultMQProducer);
  • 第一个检查,检查生产者服务是否是正常工作的,若是不正常工作,就抛出异常。
private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
              this.serviceState
              FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}
  • 第二个检查,检查消息本身是否为空,检查topic,检查消息的body
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 这里校验Topic的时候,校验了不能为空,长度和特殊字符
    Validators.checkTopic(msg.getTopic());
    //这里则校验了一些不允许使用的topic名字
    Validators.isNotAllowedSendTopic(msg.getTopic());
    // body不为空
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    // body长度不为0
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    // body 长度不能过长
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: "   defaultMQProducer.getMaxMessageSize());
    }
}

2 获取topic路由信息

嗯,这里孩子终于通过了检查,服务人员开始带着他去找自己指定的topic区域,指定是自己指定,划分还是工作人员划分的。咱总得知道这个topic区域在哪吧。

先去缓存笔记里找,有没有这个区域的信息,若是没有这个topic,就新建一个,然后更新到缓存笔记里边。若有topic但是不知道在哪,就找name server大脑去申请这个topic在哪的信息。

执行tryToFindTopicPublishInfo方法去获取Topic的路由信息,若是不存在就新建,若是有topic但是缓存中没有路由信息,则通过name server获取路由信息。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //获取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //不存在
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //新建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //包含路由信息就直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //不包含路由信息则向name server申请,修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3 计算重试次数

这就是计算消息孩子可以尝试去找地方坐几次,没坐上,欸,我又来了,没坐上,欸,我又来了。

这行代码就是计算重试次数的,根据communicationMode传入的值,同步异步还是单向的来决定重试次数是几次。 很明显,若是同步的,就会尝试三次。若是异步的或者单向的就只发送一次。

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1   this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

4 执行队列选择方法

我们之前说了,Broker类似于候船大厅,为了均分压力,每次都要进与上次不同的候船大厅。

执行selectOneMessageQueue方法通过Queue将消息发送到与上次不同的一个Broker。也可以通过 sendLatencyFaultEnable判断是否启用延迟容错开关

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

5 发送消息

这就是走过巷道坐到属于自己的座位上了

然后就通过sendKernelImpl发送消息了,这是发送消息的核心方法。会准备通信层的入参,并将请求发送给通信层,内部实现是基于Netty的。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

以上就是RocketMQ消息发送流程源码剖析的详细内容,更多关于RocketMQ消息发送流程的资料请关注Devmax其它相关文章!

RocketMQ消息发送流程源码剖析的更多相关文章

  1. vitejs预构建理解及流程解析

    这篇文章主要为大家介绍了vitejs预构建理解及流程解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  2. 一文了解Python 流程控制

    这篇文章主要介绍了一文了解Python 流程控制,Python 中有while和for两种循环机制,其中while循环是条件循环,文章通过展开循环内容展开控制流程详情,需要的小伙伴可以参考一下

  3. SpringBoot集成RocketMQ发送事务消息的原理解析

    RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致,这篇文章主要介绍了SpringBoot集成RocketMQ发送事务消息,需要的朋友可以参考下

  4. RocketMQ延迟消息简明介绍

    这篇文章主要介绍了RocketMQ延迟消息,延迟消息是个啥?顾名思义,就是等一段时间再消费的消息。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  5. RocketMQ特性Broker存储事务消息实现

    这篇文章主要为大家介绍了RocketMQ特性Broker存储事务消息实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  6. Spring Boot 整合RocketMq实现消息过滤功能

    这篇文章主要介绍了Spring Boot 整合RocketMq实现消息过滤,本文讲解了RocketMQ实现消息过滤,针对不同的业务场景选择合适的方案即可,需要的朋友可以参考下

  7. php+ajax发起流程和审核流程(以请假为例)

    这篇文章主要介绍了php+ajax发起流程和审核流程(以请假为例) ,需要的朋友可以参考下

  8. Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,本篇我们了解如何实现顺序消息的发送与消费

  9. vue原理Compile从新建实例到结束流程源码

    这篇文章主要为大家介绍了vue原理Compile从新建实例到结束流程源码,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. RocketMQ Namesrv架构工作原理详解

    这篇文章主要为大家介绍了RocketMQ Namesrv架构工作原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部