简介

RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。

原理

RocketMQ事务消息通过异步确保方式,保证事务的最终一致性。设计的思想可以借鉴两个阶段提交事务。其执行流程图如下:

图片.png

  • 发送方向MQ服务端发送消息。
  • MQ Server将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  • 发送方开始执行本地事务逻辑。
  • 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  • 在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
  • 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  • 发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。

具体实现

消费者

@Component
public class TransactionProduce
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendTransactionMessage(String msg)
    {
        logger.info("start sendTransMessage hashKey:{}",msg);
       
         Message message =new Message();
         message.setBody("this is tx message".getBytes());
         TransactionSendResult result=rocketMQTemplate.sendMessageInTransaction("test-tx-rocketmq", 
                 MessageBuilder.withPayload(message).build(), msg);
         
         //发送状态
         String sendStatus = result.getSendStatus().name();
         // 本地事务执行状态
         String localTxState = result.getLocalTransactionState().name();
         logger.info("send tx message sendStatus:{},localTXState:{}",sendStatus,localTxState);
    } 
}

说明:发送事务消息采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等。

消费者

@Component
@RocketMQMessageListener(consumerGroup="test-txRocketmq-group",topic="test-tx-rocketmq", messageModel = MessageModel.CLUSTERING)
public class TransactionConsumer implements RocketMQListener<String>
{
    private Logger logger =LoggerFactory.getLogger(getClass());
    @Override
    public void onMessage(String message)
    {
        logger.info("send transaction mssage parma is:{}", message);
    }
}

说明:发送事务消息的消费者与普通的消费者一样没有太大的区别。

生产者消息监听器

发送事务消息除了生产者和消费者以外,我们还需要创建生产者的消息监听器,来监听本地事务执行的状态和检查本地事务状态。

@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener
{
    private Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * 执行本地事务
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //处理业务
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

    /**
     * 检查本地事务的状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg)
    {
        logger.info("start check Local rocketMQ transaction");
        
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("check trans msg content:{}",jsonStr);
        }
        catch (Exception e)
        {
            resultState  = RocketMQLocalTransactionState.ROLLBACK;
        }
        return resultState;
    }
}

说明:RocketMQ本地事务状态由如下几种:

  • RocketMQLocalTransactionState.COMMIT:提交事务,允许消费者消费此消息。
  • RocketMQLocalTransactionState.ROLLBACK: 回滚事务,消息将被删除,不允许被消费。
  • RocketMQLocalTransactionState.UNKNOWN:中间状态,代表需要进行检查来确定状态。

注意:Spring Boot2.0的版本之后,@RocketMQTransactionListener 已经没有了txProducerGroup属性,且sendMessageInTransaction方法也将其移除。所以在同一项目中只能有一个@RocketMQTransactionListener,不能出现多个,否则会报如下错误:

java.lang.IllegalStateException: rocketMQTemplate already exists RocketMQLocalTransactionListener

消息事务测试

正常测试

c.s.fw.mq.produce.TransactionProduce - product start sendTransMessage msg:{"userId":"zhangsann"}
c.s.f.m.p.TransactionMsgListener - start invoke local rocketMQ transaction
c.s.f.m.p.TransactionMsgListener - invoke local transaction msg content:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}
c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:COMMIT_MESSAGE
c.s.f.m.consumer.TransactionConsumer - send transaction mssage parma is:{"topic":null,"flag":0,"properties":null,"body":"dGhpcyBpcyB0eCBtZXNzYWdl","transactionId":null,"keys":null,"tags":null,"delayTimeLevel":0,"waitStoreMsgOK":true,"buyerId":null}

说明:通过日志我们可以看出,执行的流程与上述的一致,执行成功后,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为COMMIT_MESSAGE。

异常测试

如果在执行本地消息时出现异常,那么执行结果会是怎样?修改下本地事务执行的方法,让其出现异常。

代码调整

  @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
            Object obj)
    {
        logger.info("start invoke local rocketMQ transaction");
        RocketMQLocalTransactionState resultState = RocketMQLocalTransactionState.COMMIT;
        
        try
        {
            //处理业务
            String jsonStr = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            logger.info("invoke local transaction msg content:{}",jsonStr);
             int c=1/0;
        }
        catch (Exception e)
        {
            logger.error("invoke local mq trans error",e);
            resultState = RocketMQLocalTransactionState.UNKNOWN;
        }
        
        return resultState;
    }

执行结果

c.s.fw.mq.produce.TransactionProduce - send tx message sendStatus:SEND_OK,localTXState:UNKNOW

从执行的结果可以看出,消息执行成功返回的结果为SEND_OK,本地事务执行的状态为:UNKNOW.所以消费端无法消费此消息。

总结

到此这篇关于SpringBoot集成RocketMQ发送事务消息的文章就介绍到这了,更多相关SpringBoot集成RocketMQ事务消息内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

SpringBoot集成RocketMQ发送事务消息的原理解析的更多相关文章

  1. SpringBoot本地磁盘映射问题

    这篇文章主要介绍了SpringBoot本地磁盘映射问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  2. java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

    这篇文章主要介绍了java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源),文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  3. SpringBoot整合Javamail实现邮件发送的详细过程

    日常开发过程中,我们经常需要使用到邮件发送任务,比方说验证码的发送、日常信息的通知等,下面这篇文章主要给大家介绍了关于SpringBoot整合Javamail实现邮件发送的详细过程,需要的朋友可以参考下

  4. SpringBoot详细讲解视图整合引擎thymeleaf

    这篇文章主要分享了Spring Boot整合使用Thymeleaf,Thymeleaf是新一代的Java模板引擎,类似于Velocity、FreeMarker等传统引擎,关于其更多相关内容,需要的小伙伴可以参考一下

  5. Springboot集成mybatis实现多数据源配置详解流程

    在日常开发中,若遇到多个数据源的需求,怎么办呢?通过springboot集成mybatis实现多数据源配置,简单尝试一下,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  6. SpringBoot使用Minio进行文件存储的实现

    本文主要介绍了SpringBoot使用Minio进行文件存储的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  7. 解析SpringBoot中使用LoadTimeWeaving技术实现AOP功能

    这篇文章主要介绍了SpringBoot中使用LoadTimeWeaving技术实现AOP功能,AOP面向切面编程,通过为目标类织入切面的方式,实现对目标类功能的增强,本文给大家介绍的非常详细,需要的朋友可以参考下

  8. 详解springboot测试类注解

    这篇文章主要介绍了springboot测试类注解,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  9. SpringBoot实现过滤器和拦截器的方法

    大家应该都晓得实现过滤器需要实现 javax.servlet.Filter 接口,而拦截器会在处理指定请求之前和之后进行相关操作,配置拦截器需要两步,本文通过实例代码给大家介绍SpringBoot 过滤器和拦截器的相关知识,感兴趣的朋友一起看看吧

  10. SpringBoot中使用@scheduled定时执行任务的坑

    本文主要介绍了SpringBoot中使用@scheduled定时执行任务的坑,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部