前言

最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。

项目中采用的是Spring Boot2.0集成的MQTT引入的版本为1.2.0,消息发送用的是MessagingGateway的方式实现,不熟悉的朋友可以查看这篇文章Spring boot 集成 MQTT详情

原因分析

出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义

0 最多一次的传输

发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。

1 至少一次的传输

发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.

2 只有一次的传输

 Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。

项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:

当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。

源码分析

关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。

MQTT的Push消息到缓存中时序图

MqttPahoMessageHandler的publish方法

说明: checkConection检查连接后,在发送消息。

MqttAsyncClient的publish方法

ClientComms的internalSend方法

ClientState的send方法

MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0

SaveToken的源码实现如下:

通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。

异步发送消息时序图

说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。

ClientComms的conncect方法

ConnectBG的run方法

CommsSender的run方法

1.从clientState中获取消息 2.通过消息id去hashtable中获取缓存消息 3.消息不为空,执行消息发送 4.调用notifySent方法删除消息,且actualInFlight执行递减操作。

CommsSender的notifySent方法

小结

在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。

//存放待发送消息的Vector数组
volatile private Vector pendingMessages;

解决方案

方案1:发送消息时设置为Qos=1

此方案虽然可以解决此问题,但存在如下的缺点:

  • 网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。
  • 发送消息需要进行消息确认,网络资源消耗过大。

方案2: 修改maxInflight的默认值,例如将其修改为50

```
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setMaxInflight(50);
```

此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。

方案3:将消息配置为多客户端模式

由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。

方案4:升级mqtt版本为1.2.1

在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。

引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。

解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。

mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

总结

本文通过定位MQTT错误,详细的讲解了MQtt消息的发送流程,解决的方案虽然有多种,我们需要结合实际的业务情况来解决问题,关于MQtt如果还有其他疑问,可以随时反馈,大家共同学习,共同进步。

到此这篇关于Spring Boot MQTT Too many publishes in progress错误的解决方案的文章就介绍到这了,更多相关Spring Boot内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Spring Boot MQTT Too many publishes in progress错误的解决方案的更多相关文章

  1. 浅谈HTML5 FileReader分布读取文件以及其方法简介

    本篇文章主要介绍了浅谈HTML5 FileReader分布读取文件以及其方法简介,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  2. ios – 在导航栏下添加进度条

    我是iOS开发的新手.我想知道在iOS7中是否在UINavigationBar下发送消息时,其标题为:发送,有一个进度条正在加载,直到消息成功发送.我的问题是:>那个酒吧是进度条吗?有人可以给我一些关于如何在iOS7和iOS6上创建它的想法吗?

  3. ios – 当应用程序强制退出时如何恢复下载?

    )方法PS:谢谢你的回答我明白我的帖子有多么误导性.我会尝试工作的框架,可以允许恢复下载后,应用程序强制退出

  4. ios – 如何在UIProgressView中设置进度色调

    我想知道如何设置UIProgressView色调颜色.有时,根据颜色,默认高度不允许正确查看进度.如何解决这个问题?解决方法您可以通过此设置曲线和进度条的色调颜色;对于轨道颜色:进度条:希望这可以帮助..:)

  5. ios – OSX上的AWS MQTT

    解决方法由于SSL握手问题,它是失败的.它正在检测到一个无效的证书.报告和解决了类似的问题here,引用相同的错误代码.由于p12文件中有多个身份,该问题被追溯到身份不匹配.在这种情况下,p12文件中有两个证书,但代码只读取第一个.我建议倾销.p12文件的内容,并确认证书.发布在这里审查.

  6. ios – 进度视图未更新 – Swift

    我正在使用Swift中的ProgressView这是我的代码即使完整提供此输出它不会在此更新进度视图解决方法针对Swift3进行了更新对UI的更改应始终发生在主/前台线程上,在这种情况下,它发生在后台队列上.使用:把它移到前台.前Swift3的旧答案对UI的更改应始终发生在主/前台线程上,它发生在后台队列上.使用:把它移到前台.

  7. ios – NSProgress奇怪的行为

    我有一个由几个子任务组成的大任务.我想为这项重大任务添加进度报告.为此,我想使用nsprogress,并根据类文档,我可以通过使用其子–父机制来做这种子任务进展.所以为了简化它,让我说我有一个由一个子任务组成的大任务.这就是我所做的:如您所见,子任务使用背景上下文来运行一些与CoreData相关的代码,而后台上下文使用主上下文作为其父上下文.这会导致进度的“fractionCompleted”属性出现一些奇怪的KVO.这是印刷品:如你所见,打印以1.0,0.5和1.0开始,然后是0.66?

  8. swift爬行篇 滑块,进度条,步进,

    滑块1.创建continous属性为true:滑块在滑动过程中响应事件,为false时则只在滑动后响应事件2.设置各区域的图片3.响应事件和Tag4.获取滑块的值进度条1创建2.设置步进控件1.创建2.响应事件continuous属性UiSlider类似3.获取步进的值

  9. Swift - 进度条UIProgressView的用法

    overridefuncviewDidLoad(){//这里放置步骤二的代码即可}二、在函数中创建进度条控件progress=UIProgressViewprogress.progress=0progress.progresstintColor=UIColor.redColor()progress.trackTintColor=UIColor.blackColor()self.view.addSubviewtimer=NSTimer.scheduledTimerWithTimeIntervaltimer.

  10. Swift UISwitch/UIProgressView/UISlider

    1.UISwitch开关视图,可以让用户快速的开关一个功能,比如蓝牙,wif等.系统默认样式:上面绿色的开启状态,下面的是关闭状态.UISwitch的构成部分:UISwitch的大小是W51H31,这固定在,在设置它的frame的时候可以不设置其大小.为什么是固定的呢?

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部