springcloud Stream

什么是springcloud Stream

  现在市面上有很多的消息中间件,每一个公司使用的都有所不同,为了减少学习的成本,springcloud Stream可以让我们不再关注消息中间件MQ的具体细节,我们只需要通过适配绑定的方式即可实现不同MQ之间的切换,但是遗憾的是springcloud Stream目前只支持RabbitMQ和Kafka。

  SpringCloud Stream是一个构建消息驱动微服务的框架,应用程序通过inputs或者 outputs来与SpringCloud Stream中的binder进行交互,我们可以通过配置来binding ,而 SpringCloud Stream 的binder负责与中间件交互,所以我们只需要搞清楚如何与Stream交互就可以很方便的使用消息驱动了!

什么是Binder

  Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离,可以动态的改变消息的destinations(对应于 Kafka的topic,RabbitMQ的exchanges),这些都可以通过外部配置项来做到,甚至可以任意的改变中间件的类型但是不需要修改一行代码

为什么使用Stream

  比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移;这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这衬候springcloud Stream给我们提供了一种解耦合的方式。

Stream使用案例

前置知识

Stream处理消息的架构

  Source、Sink: 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介。Binder: 消息的生产者和消费者中间层,实现了应用程序与消息中间件细节之间的隔离

  通过以上两张图片可知,消息的处理流向是:消息生产者处理完业务逻辑之后消息到达source中,接着前往Channel通道进行排队,然后通过binder绑定器将消息数据发送到底层mq,然后又通过binder绑定器接收到底层mq发送来的消息数据,接着前往Channel通道进行排队,由Sink接收到消息数据,消息消费者拿到消息数据执行相应的业务逻辑

Stream常用注解

消息生产者8801模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: 业务层service代码编写,注意:这里实现类注入的对象由之前的dao层对象换成了channel通道对象,详细的发送由实现类的第12完成

public interface IMessageProviderService {
    /**
     * 定义消息的推送管道
     *
     * @return
     */
    String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    /**
     * 消息发送管道/信道
     */
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("*****serial: "   serial);
        return serial;
    }
}

  第五步: controller接口

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

消息消费者8802模块搭建

  第一步: 创建一个maven模块,引入相关依赖,最主要的就是stream整合rabbitmq的依赖

<!--stream的rabbitmq依赖-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 配置文件的编写,与生产者的区别就在于bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程序类

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("启动成功");
    }
}

  第四步: controller接口,使用url请求生产者8801,即可在消费者8802端接收到8801发送的消息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消费者1号 ----> port:"   serverPort   "\t从8801接受到的消息是:"   message.getPayload());
    }
}

  两个模块搭建完成进行测试,首先启动注册中心7001,然后分别启动消息生产者8801和消息消费者8802,通过url请求访问8001的发送消息请求,会向指定管道中发送一条消息,如果此时这个管道中有消费者即可接收到这条消息。而如何指定消息的管道归属呢,就是通过配置文件中的indings.input.destination来指定,命名相同的服务就会处在同一条管道中

Stream带来的问题

重复消费问题

  按照之前的使用,会带来重复消费问题: 也就是说一个通道上有不止一个消息消费者,stream上默认每一个消费者都属于不同的组,这样的话就会导致这个消息被多个组的消费者重复消费

  知道了问题出现的原因就很容易解决了,只要我们自定义配置分组,将这些消费者都分配到同一个组中就能避免重复消费的问题出现了(同一个组间的消费者是竞争关系,不管组间有多少的消费者都只会消费一次)

自定义分组

  只需要在配置文件修改一处配置即可实现自定义组名并且自定义分组,组名相同的服务会被分配到同一组,通道内的消息数据会被该组中的所有消费者轮询消费

持久化问题

  上面自定义分组使用的group配置除了可以自定义分组和分组名之外,还可以实现消息的持久化,也就是说使用group配置自定义分组和分组名的消息消费者,就算在消息生产者发送消息的时候挂掉了,等这个消费者重启之后依然是能够消费之前发送的消息

这里一个生产者和两个消费者存在以下十三种情况(生产者发送四次消息):

1、都使用group分组的两个不同组成员,在生产者生产的时候

  • 都没挂(各消费四次)
  • 挂了其中一个(各消费四次)
  • 都挂了(各消费四次)

2、都使用group分组的两个同组成员,在生产者生产的时候

  • 都没挂(各消费两次)
  • 挂了其中一个(没挂的把四次消费完)
  • 都挂了(各消费两次)

3、其中一个使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • group的挂了(各消费四次)
  • 没group的挂了(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

4、都不使用group分组的两个成员,在生产者生产的时候

  • 都挂了(都不消费)
  • 挂了其中一个(没挂的消费四次,挂的由于没有持久化所以不消费)
  • 都没挂(各消费四次)

  总之一句话,通道里的消息会持久化给使用group配置的消息消费者(每一组都有一份),就算发送消息的时候这些消费者挂了,如果同组的消费者有没挂的就会把这些消息竞争消费完;如果同组没有消费者,等他重启之后还是会消费这些消息

到此这篇关于Springcloud Stream消息驱动工具使用介绍的文章就介绍到这了,更多相关Springcloud Stream内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Springcloud Stream消息驱动工具使用介绍的更多相关文章

  1. SpringCloud超详细讲解微服务网关Zuul基础

    这篇文章主要介绍了SpringCloud Zuul微服务网关,负载均衡,熔断和限流,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. SpringCloud gateway+zookeeper实现网关路由的详细搭建

    这篇文章主要介绍了SpringCloud gateway+zookeeper实现网关路由,本文通过图文实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. SpringCloud OpenFeign 服务调用传递 token的场景分析

    这篇文章主要介绍了SpringCloud OpenFeign 服务调用传递 token的场景分析,本篇文章简单介绍 OpenFeign 调用传递 header ,以及多线程环境下可能会出现的问题,其中涉及到 ThreadLocal 的相关知识,需要的朋友可以参考下

  4. 浅析Node.js 中 Stream API 的使用

    这篇文章给大家浅析node.js中stream api的使用,本文介绍的非常详细,涉及到node.js api,node.js stream相关知识,感兴趣的朋友可以参考下

  5. 使用IntelliJ IDEA调式Stream流的方法步骤

    本文主要介绍了使用IntelliJ IDEA调式Stream流的方法步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  6. Nodejs Buffer的使用及Stream流和事件机制详解

    这篇文章主要为大家介绍了Nodejs Buffer的使用及Stream流和事件机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  7. Springcloud Stream消息驱动工具使用介绍

    SpringCloud Stream由一个中间件中立的核组成,应用通过SpringCloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是发送消息到队列中的)通道与外界交流

  8. SpringCloud中Gateway的使用教程详解

    SpringCloud Gateway是Spring体系内的一个全新项目,它旨在为微服务架构提供一种简单有效的统一的API路由管理方式。本文就来为大家详细讲讲Gateway的使用教程,需要的可以参考一下

  9. 深入nodejs中流(stream)的理解

    本篇文章主要介绍了深入nodejs中流(stream)的理解,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  10. 浅谈Node.js:理解stream

    本篇文章主要介绍了Node.js:stream,Stream在node.js中是一个抽象的接口,具有一定的参考价值,有需要的可以了解一下。

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部