前言

使用springboot,实现以下功能,有两个队列1、2,往里面发送消息,如果处理失败发生异常,可以重试3次,重试3次均失败,那么就将消息发送到死信队列进行统一处理,例如记录数据库、报警等
完整demo项目代码https://gitee.com/daenmax/rabbit-mq-demo

环境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.双击C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat启动MQ服务
2.然后访问http://localhost:15672/,默认账号密码均为guest,
3.手动添加一个虚拟主机为admin_host,手动创建一个用户账号密码均为admin

pom.xml

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.0</version>
        </dependency>

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: admin_host
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true    #开启失败重试
          max-attempts: 3    #最大重试次数
          initial-interval: 1000  #重试间隔时间 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;


/**
 * Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 * Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 * Queue:消息的载体,每个消息都会被投到一个或多个队列。
 * Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 * Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 * vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 * Producer:消息生产者,就是投递消息的程序.
 * Consumer:消息消费者,就是接受消息的程序.
 * Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 */
@Slf4j
@Component
public class RabbitConfig {
    //业务交换机
    public static final String EXCHANGE_PHCP = "phcp";
    //业务队列1
    public static final String QUEUE_COMPANY = "company";
    //业务队列1的key
    public static final String ROUTINGKEY_COMPANY = "companyKey";
    //业务队列2
    public static final String QUEUE_PROJECT = "project";
    //业务队列2的key
    public static final String ROUTINGKEY_PROJECT = "projectKey";

    //死信交换机
    public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
    //死信队列1
    public static final String QUEUE_COMPANY_DEAD = "company_dead";
    //死信队列2
    public static final String QUEUE_PROJECT_DEAD = "project_dead";
    //死信队列1的key
    public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
    //死信队列2的key
    public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";


//    /**
//     * 解决重复确认报错问题,如果没有报错的话,就不用启用这个
//     *
//     * @param connectionFactory
//     * @return
//     */
//    @Bean
//    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//        return factory;
//    }

    /**
     * 声明业务交换机
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * HeadersExchange :通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     */
    @Bean("exchangePhcp")
    public DirectExchange exchangePhcp() {
        return new DirectExchange(EXCHANGE_PHCP);
    }

     * 声明死信交换机
    @Bean("exchangePhcpDead")
    public DirectExchange exchangePhcpDead() {
        return new DirectExchange(EXCHANGE_PHCP_DEAD);

     * 声明业务队列1
     *
     * @return
    @Bean("queueCompany")
    public Queue queueCompany() {
        Map<String,Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
        //绑定该队列到死信交换机的队列1
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
        return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
     * 声明业务队列2
    @Bean("queueProject")
    public Queue queueProject() {
        //绑定该队列到死信交换机的队列2
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
        return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();

     * 声明死信队列1
    @Bean("queueCompanyDead")
    public Queue queueCompanyDead() {
        return new Queue(QUEUE_COMPANY_DEAD);
     * 声明死信队列2
    @Bean("queueProjectDead")
    public Queue queueProjectDead() {
        return new Queue(QUEUE_PROJECT_DEAD);

     * 绑定业务队列1和业务交换机
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);

     * 绑定业务队列2和业务交换机
    public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);

     * 绑定死信队列1和死信交换机
    public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);

     * 绑定死信队列2和死信交换机
    public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}

生产者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 初始化消息确认函数
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
    }
    /**
     * 发送消息服务器确认函数
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息发送成功"   correlationData);
        } else {
            System.out.println("消息发送失败:"   cause);
        }
    }
    /**
     * 消息发送失败,消息回调函数
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = new String(returnedMessage.getMessage().getBody());
        System.out.println("消息发送失败:"   str);
    }
    /**
     * 处理消息发送到队列1
     * @param str
     */
    public void sendCompany(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
    }
    /**
     * 处理消息发送到队列2
     * @param str
     */
    public void sendProject(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
    }
}

业务消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 监听业务交换机
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumer {
    /**
     * 监听业务队列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company")
    public void company(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数"   message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息" s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:" e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    /**
     * 监听业务队列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project")
    public void project(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次数"   message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("处理消息" s);
            //下面两行是尝试手动抛出异常,用来测试重试次数和发送到死信交换机
            //String str = null;
            //str.split("1");
            //处理成功,确认应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("处理消息时发生异常:" e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("异常重试次数已到达设置次数,将发送到死信交换机");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即将返回队列处理重试");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

死信消费者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 监听死信交换机
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumerDead {
    /**
     * 处理死信队列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company_dead")
    public void company_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信" s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:" e.getMessage());
        }
    }
    /**
     * 处理死信队列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project_dead")
    public void project_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("处理死信" s);
            //在此处记录到数据库、报警之类的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收异常:" e.getMessage());
        }
    }
}

测试

MqController

package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
    @Resource
    private RabbltProducer rabbltProducer;
    
    @RequestMapping("/handleCompany")
    public void handleCompany(@RequestBody String jsonStr){
        rabbltProducer.sendCompany(jsonStr);
    }
}

到此这篇关于SpringBoot整合RabbitMQ实战附加死信交换机的文章就介绍到这了,更多相关SpringBoot整合RabbitMQ死信交换机内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

SpringBoot整合RabbitMQ实战教程附死信交换机的更多相关文章

  1. iOS7中可能存在UISwitch错误?

    我正在使用UISwitch调用子视图来在我的应用中进行屏幕显示.但是,交换机仅在大约60%的时间内工作.为了测试我的代码,我将交换机连接到另一个IBAction,以将交换机的状态写入控制台.两个功能在某些时间都没有响应开关的状态,即两个功能在某些时候同时忽略开关的状态.在iOS7中有没有其他人在UISwitches中遇到过这种行为?

  2. ios – 切换到Swift – 交换机中的Case标签应该至少有一个可执行语句

    解决方法swiftswitch语句中没有隐含的漏洞,因此您必须明确设置:没有它,每个案例都有隐含的突破.请注意,swift要求每个switchcase包含至少一个语句–如果没有语句,则必须使用显式中断

  3. 如何在Swift中为交换机案例创建noop块?

    Swift强制您在您的案例下至少有一个可执行语句,包括默认值。我试着放一个空{},但Swift不会接受。这意味着Swift的开关情况不能完全在if-else之间翻译,反之亦然,因为在if-else你允许在条件内有空代码。例如苹果在thisarticle谈论这个关键字。

  4. swift – 交换机:枚举交换机问题:不是int的成员

    我在Swift中编写了我的第一个项目,由于某种原因,我无法找出为什么我的枚举和开关不工作属性开关我得到的错误是枚举大小写“viewmodeFavourite”不是“Int!”类型的成员所以我改为这个,因为它不是一个Int!forStoryboard)然后我得到枚举大小写模式不能匹配非枚举类型“Int”的值你必须将contactviewmode声明为Contactviewmode而不是Int。如果您真的希望它是Int,那么您必须通过将变量与枚举情况的rawValue属性进行比较来更改交换机中的大小写:但除非

  5. SpringBoot本地磁盘映射问题

    这篇文章主要介绍了SpringBoot本地磁盘映射问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

  6. java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

    这篇文章主要介绍了java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源),文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  7. SpringBoot整合Javamail实现邮件发送的详细过程

    日常开发过程中,我们经常需要使用到邮件发送任务,比方说验证码的发送、日常信息的通知等,下面这篇文章主要给大家介绍了关于SpringBoot整合Javamail实现邮件发送的详细过程,需要的朋友可以参考下

  8. SpringBoot详细讲解视图整合引擎thymeleaf

    这篇文章主要分享了Spring Boot整合使用Thymeleaf,Thymeleaf是新一代的Java模板引擎,类似于Velocity、FreeMarker等传统引擎,关于其更多相关内容,需要的小伙伴可以参考一下

  9. Springboot集成mybatis实现多数据源配置详解流程

    在日常开发中,若遇到多个数据源的需求,怎么办呢?通过springboot集成mybatis实现多数据源配置,简单尝试一下,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

  10. SpringBoot使用Minio进行文件存储的实现

    本文主要介绍了SpringBoot使用Minio进行文件存储的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

随机推荐

  1. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  2. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  3. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  4. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  5. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  6. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  7. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  8. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  9. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. Spring JdbcTemplate执行数据库操作详解

    JdbcTemplate是Spring框架自带的对JDBC操作的封装,目的是提供统一的模板方法使对数据库的操作更加方便、友好,效率也不错,这篇文章主要介绍了Spring JdbcTemplate执行数据库操作,需要的朋友可以参考下

返回
顶部