一个错误:多线程使用单一消费者

下图显现了一种错误的使用KafkaConsumer的方法

  • 创建多个线程用来消费kafka数据
  • 多线程使用同一个KafkaConsumer对象
  • 在单线程中使用这个KafkaConsumer对象,完成数据拉取、处理、提交偏移量。

在这里插入图片描述

这种方式之所以错误的原因是:KafkaConsumer是线程不安全的,可能出现把同一批数据既给线程A处理,也交给线程B处理重复消费的问题。

一个误区:多线程就是消费者组

下图中体现的是一种正常的KafkaConsumer使用方式

  • 使用一个KafkaConsumer拉取数据
  • 拉取数据后将一个批次的数据交给一个线程去处理

在这里插入图片描述

这个处理方式不是错误,但是他只是一个消费者在消费kafka消息队列中的数据,不是消费者组的方式消费数据。无法充分利用kafka分区提升消息处理的吞吐量。

常规正确做法:使用线程池实现消费者组

下面的方法是常规的正确实现方式

在这里插入图片描述

  • 因为KafkaConsumer是线程不安全的,所以不能跨线程使用KafkaConsumer
  • 每个线程持有一个KafkaConsumer对象
  • 多个线程的实现可以使用线程池,线程池的线程数量等于消费者组内消费者的数量
public class MyConsumerGroup {
    public void groupConsumer(){
        ExecutorService executorService = Executors.newFixedThreadPool(6);
        for (int i = 0; i < 6; i  ) {
            MyConsumer myConsumer = new MyConsumer();
            executorService.execute(myConsumer);
        }
    }
}

MyConsumer方法需要实现Runnable接口,并在run方法中调用MyConsumer#pollData。MyConsumer的代码参考本专栏的《消费者Java实现》( 集成apache kafka-clients实现数据消费者)

@Override
public void run() {
    MyConsumer myConsumer = new MyConsumer();
    myConsumer.pollData();
}

到此这篇关于结合线程池实现apache kafka消费者组的文章就介绍到这了,更多相关apache kafka消费者组内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

结合线程池实现apache kafka消费者组的误区及解决方法的更多相关文章

  1. IOs Cordova长按显示文本选择放大镜即使禁用文本选择,如何删除?

    是否有任何可能导致此问题的插件?任何帮助深表感谢.Cordova插件:>com.mbppower.camerapreview>cordova-plugin-statusbar>cordova-plugin-whitelist>离子插件键盘>org.apache.cordova.camera>org.apache.cordova.console>org.apache.cordova.device>org.apache.cordova.dialogs>org.apache.cordova.file>org.a

  2. android – org.apache.cordova.api不存在. PhoneGap 3.0

    我正在尝试将VideoPlayer插件(https://github.com/macdonst/VideoPlayer)添加到我的phonegapAndroid应用程序中.在编译时遇到问题:第25行:解决方法将您的导入更改为:

  3. 如何将android客户端连接到我的笔记本电脑内的Apache服务器(php)的localhost?

    我的笔记本电脑中的localhost-127.0.0.1或android10.0.0.1中的localhost?>那么,如果我想从android访问localhost来调用PHP来运行?哪个ip地址/url我需要放在Android应用程序?我需要在httpconfig中为XAMPP修改任何内容吗?解决方法使用ipconfig在笔记本电脑中找到您的IP地址.在手机中使用该地址而不是127.0.0.1.

  4. android – 在android工作室中的proguard错误

    我想在我的应用程序中使用proguard,我启用它但是当我想生成apk文件时,它给了我这个错误:我正在使用最新版本的sdk23,这是我的gradle文件:怎么了?我在这段代码中做错了什么?谢谢解决方法只需在proguard上添加:

  5. 无法修复Android Proguard返回错误代码1错误

    当我尝试在我的Android应用程序中使用proguard时只需添加到我的project.properties文件,APK导出失败并显示消息Proguard返回错误代码1这是我的project.properties文件这是错误堆栈:解决方法将这些行添加到proguard配置文件(proguard-android.txt)见ProguardTroubleshooting请注意,如果您使用您的配置文件

  6. Android – 线程池策略和Loader可以用来实现吗?

    )>如果不是什么是创建AsyncTask池的好方法,是否可能正在实施它?

  7. Phonegap 2.4 Android Proguard配置

    有人有主意吗???

  8. 如何将Android App用作Kafka的“制作客户”?

    使用Android应用作为ApacheKafka的“制作客户端”是否可行/是否有意义?

  9. android – 如何在sharedPreferences中分析ANR

    在sharedPreferences中遇到ANR,不知道如何定位问题.以下是trace的三个部分,其他大多数线程都是“WAIT”或“TIMED_WAIT”.由于countdownlatch.await(),“主”线程被阻止.第二个线程“pool-1-thread-1”等待fsync.最后一个是试图读一些东西.我认为第二个线程已经阻塞了主线程,因为如果这个无法完成,它将不会调用countdownla

  10. Android无法访问org.apache.http.client.HttpClient

    我正在使用androidstudio创建一个向服务器发出GET请求的应用程序.我的代码是这样的:问题是AndroidStudio标记了这一行有错误:说“无法访问org.apache.http.client.HttpClient”这是我的gradle文件:解决方法在AndroidSDK23中不推荐使用HttpClient,因为它推断,您可以在HttpURLConnection中迁移代码https:/

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部