想使用高级消费者api来实施延迟消费者

大意:

>通过键生成消息(每个msg包含创建时间戳),这样可以确保每个分区已按生成时间排序消息.
> auto.commit.enable = false(将在每个消息处理后显式提交)
>消费消息
>检查消息时间戳,并检查是否有足够的时间过去
>进程消息(此操作永远不会失败)
> commit 1 offset

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can Now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

对此实施有一些担忧:

>提交每个偏移可能会减慢ZK
可以让consumer.commitout丢弃异常吗?如果是,我将消费两次相同的消息(可以使用幂等消息解决)
>问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,进程和提交(ZK会话超时?)
> ZK会话如何在没有提交新的偏移量的情况下保持活动? (设置一个蜂巢zookeeper.session.timeout.ms可以在死消费者中解决而不识别它)
>任何其他的问题?

谢谢!

解决方法

有一种方法可以使用不同的主题来推送所有要延迟的消息.如果所有延迟的消息都应该在相同的时间延迟之后处理,这将是相当简单的:
while(it.hasNext()) {
    val message = it.next().message()

    if(shouldbedelayed(message)) {
        val delay = 24 hours
        val delayTo = getCurrentTime() + delay
        putMessageOnDelayedQueue(message,delay,delayTo)
    }
    else {
       process(message)
    }

    consumer.commitOffset()
}

所有常规邮件现在都将尽快处理,而那些需要延迟的邮件将被放在另一个主题上.

好的是,我们知道延迟主题的头部的消息是应该首先处理的消息,因为它的delayTo值将是最小的.因此,我们可以设置另一个读取头信息的消费者,检查时间戳是否在过去,如果处理消息并提交偏移量.如果不是,它不会提交偏移量,而是直到那时睡觉:

while(it.hasNext()) {
    val delayedMessage = it.peek().message()
    if(delayedMessage.delayTo < getCurrentTime()) {
        val readMessage = it.next().message
        process(readMessage.originalMessage)
        consumer.commitOffset()
    } else {
        delayProcessingUntil(delayedMessage.delayTo)
    }
}

如果有不同的延迟时间,您可以对延迟进行分区(例如24小时,12小时,6小时).如果延迟时间比这更复杂,那么延迟时间就会更加动态.您可以通过引入两个延迟主题来解决这个问题.读取所有消息关闭延迟主题A并处理所有延迟过去的消息.其他的你只是找到一个最接近的delayTo,然后把它们放在主题B上.睡觉,直到最近的一个应该被处理,并且完全相反,即处理来自主题B的消息,并将一次不应该被处理回主题A.

回答您的具体问题(有些已在您的问题的评论中解决)

  1. commit each offset might slow ZK down

您可以考虑切换到Kafka中存储偏移量(可从0.8.2获取的功能,检查消费者配置中的offsets.storage属性)

  1. can consumer.commitOffsets throw an exception? if yes i will consume the same message twice (can solve with idempotent messages)

我相信如果它不能与偏移量存储进行通信,例如.正如你所说,使用幂等信息解决了这个问题.

  1. problem waiting long time without committing the offset,for example delay period is 24 hours,will get next from iterator,sleep for 24 hours,process and commit (ZK session timeout ?)

这不会是上述概述的解决方案的问题,除非消息本身的处理超过会话超时.

  1. how can ZK session keep-alive without commit new offsets ? (setting a hive zookeeper.session.timeout.ms can resolve in dead consumer without recognising it)

再次用上面的方法,你不需要设置一个很长的会话超时.

  1. any other problems im missing?

总是有)

java – Kafka – 使用高级消费者的延迟队列实现的更多相关文章

  1. HTML5实现直播间评论滚动效果的代码

    这篇文章主要介绍了HTML5实现直播间评论滚动效果的代码,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. 前端监听websocket消息并实时弹出(实例代码)

    这篇文章主要介绍了前端监听websocket消息并实时弹出,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  3. HTML5之消息通知的使用(Web Notification)

    通知可以说是web中比较常见且重要的功能,私信、在线提问、或者一些在线即时通讯工具我们总是希望第一时间知道对方有了新的反馈。本篇文章主要介绍了HTML5之消息通知的使用(Web Notification),感兴趣的小伙伴们可以参考一下

  4. HTML5中的Web Notification桌面通知功能的实现方法

    这篇文章主要介绍了HTML5中的Web Notification桌面通知功能的实现方法,需要的朋友可以参考下

  5. HTML5仿微信聊天界面、微信朋友圈实例代码

    小编最近开发一个基于html5开发的一个微信聊天前端界面,功能很全面,下面小编给大家分享实例代码,需要的朋友参考下

  6. HTML5的postMessage的使用手册

    HTML5提出了一个新的用来跨域传值的方法,即postMessage,这篇文章主要介绍了HTML5的postMessage的使用手册的相关资料,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  7. ios – Testflight无法安装应用程序

    我有几个测试人员注册了testflight并连接了他们的设备……他们有不同的ios型号……但是所有这些都有同样的问题.当他们从“safari”或“testflight”应用程序本身单击应用程序的安装按钮时……达到约90%并出现错误消息…

  8. xcode找不到匹配的配置文件

    我有一个AdhociOS应用程序,它给了我“在xcode6中找不到匹配的配置文件”,我创建了一个Adhoc配置文件,下载它,双击它并在General–Identity下选择了一个团队.但我接着得到了那条消息,并尝试使用“修复问题”按钮没有帮助.在构建设置–供应配置文件–发布我有“自动”.任何人都可以帮助我,我完全迷失了……

  9. ios – Reactive Cocoa – 以编程方式设置文本时不会调用UITextView的rac_textSignal

    我正在实现一个聊天UI,并使用ReactiveCocoa根据用户的类型调整聊天气泡的大小.目前,我正在基于textview的rac_textSignal更新UI的布局.一切都很好–除了一点:当用户发送消息时,我以编程方式清除文本字段:…我是否需要拥有一个持有currentTypedString的Nsstring,并在该字符串更新时驱动UI更改?

  10. ios – 当我关闭应用程序时,我从调试器获得消息:由于信号15而终止

    我怎么能解决这个问题,我不知道这个链接MypreviousproblemaboutCoredata对我的问题有影响吗?当我cmd应用程序的Q时,将出现此消息.Messagefromdebugger:Terminatedduetosignal15如果谁知道我以前的问题的解决方案,请告诉我.解决方法>来自调试器的消息:每当用户通过CMD-Q(退出)或STOP手动终止应用程序(无论是在iOS模拟器中还是

随机推荐

  1. java – 合并多个相同的Kafka Streams主题

    )添加自定义存储,以使重复过滤容错.

  2. java – 我在哪里可以找到kafka的maven存储库?

    我想尝试kafka0.8.但是我在哪里可以找到kafkamaven存储库.我应该添加哪些额外的存储库URL?我找到了一些博客但它不起作用.我正在寻找合适的maven依赖.或者我应该从git中检出它并部署在我们的内部神器中?

  3. java – 为kafka主题配置ACL

    我有一个不安全的kafka实例,有2个代理,一切运行正常,直到我决定为主题配置ACL,在ACL配置后,我的消费者停止从Kafka轮询数据,并且在获取具有相关ID的元数据时我不断收到警告错误,我的代理属性看起来像下面:-我的客户端配置如下所示:–我用下面的命令来配置ACL在我启动消费者后完成所有上述配置后,它停止接收消息.有人可以指出我在误解的地方.提前致谢.解决方法我们成功使用ACL,但没有使用P

  4. java – kafka如何平衡分区加载?

    我在kafka遇到了负载均衡的问题.所以,我创建了一个包含10个分区的主题并创建了2个消费者.这10个分区被划分并分配给这些消费者,并且工作正常.有时第一个消费者工作,有时第二.但是,在某一时刻,我们可能面临一种情况,例如第二个消费者收到消息并且需要时间来处理此消息.那么,我的问题是kafka将如何决定将消息存储在哪个分区中?

  5. java – Kafka Connect实现错误

    我在这里运行教程:http://kafka.apache.org/documentation.html#introduction当我进入“步骤7:使用KafkaConnect导入/导出数据”并尝试启动两个连接器时,我收到以下错误:这是教程的一部分:Next,we’llstarttwoconnectorsrunninginstandalonemode,whichmeanstheyruninasing

  6. java – Storm-Kafka多个出口,如何共享负载?

    提前感谢你的时间.更新响应答案:现在在卡夫卡使用多分区(即5)以下是使用的代码:builder.setspout;在每个分区上用800MB数据进行淹水测试,花费22秒完成阅读.再次使用parallelism_hint=1的代码即builder.setspout;现在需要更多23秒!可以看到,这里的经纪人可以使用hosts.add添加,而且在新的KafkaConfig.StaticHosts代码片段中将partion号指定为4.如何提及卡夫卡喷嘴的并行提示?我是新来的风暴和java!!!!

  7. java Kafka生产者错误

    我做了kafkajava生产者.但控制台说错误.kafka服务器在aws上.和制作人在我的Mac上.然而kara服务器是可以访问的.当我从制作人发送消息时,kafka服务器显示“已接受的连接…”

  8. java – Kafka – 使用高级消费者的延迟队列实现

    如果是,我将消费两次相同的消息>问题等待很长时间没有提交偏移量,例如延迟时间是24小时,将从迭代器获取下一个,睡眠24小时,进程和提交(ZK会话超时?)>ZK会话如何在没有提交新的偏移量的情况下保持活动?

  9. java – 如何以健壮的方式处理kafka发布失败

    我正在使用Kafka,我们有一个用例来构建一个容错系统,甚至连一条消息都不会错过.所以这就是问题所在:如果由于任何原因(ZooKeeperdown,Kafkabroker等)向Kafka发布失败,我们如何能够有效地处理这些消息并在事情再次恢复后重播它们.正如我所说的那样,即使单个消息失败也无法承受.另一个用例是我们还需要在任何给定时间点知道有多少消息由于任何原因而无法发布到Kafka,例如计数器功

  10. java – Kafka快速入门:我需要哪些依赖项?

    接下来我该怎么办?我需要其他依赖项吗?解决方法问题是kafkabetawasbuiltinawaythatpomgeneratedwithajarisn’tvalidandmavencouldnotrecognizeitandparseproperly,从而获取传递依赖.我们已经设法通过在我们的pom定义中从该pom中获取所有依赖项来缓解此问题.我们正在等待kafka的下一个beta版本,其中将解决问题.完整依赖列表如下.请注意,您必须相应地更改scala版本依赖关系到kafka工件的后缀.至于Maybe

返回
顶部