一、时间轮算法简介

为了大家能够理解下文中的代码,我们先来简单了解一下netty时间轮算法的核心原理

时间轮算法名副其实,时间轮就是一个环形的数据结构,类似于表盘,将时间轮分成多个bucket(比如:0-8)。假设每个时间轮轮片的分隔时间段tickDuration=1s(即:指针经过每个格子花费时间是 1 s),当前的时间bucket=3,那么在18秒后需要被执行的任务需要落到((3 18)%8=5取余运算)的5号bucket上。假如有多个需要在该时间段内执行的任务,就会组成一个双向链表。另外针对时间轮我们要有下面的几个认知:

时间轮指针是一个Worker线程,在时间轮整点的时候执行双向链表中的任务。

时间轮算法的并不是精准的延时,它的执行精度取决于每个时间轮轮片的分隔时间段tickDuration

Worker线程是单线程,一个bucket、一个bucket的顺序处理任务。「所以我们的延时任务一定要做成异步任务,否则会影响时间轮后续任务的执行时间。」

二、时间轮hello-world

实现一个延时任务的例子,需求仍然十分的简单:你买了一张火车票,必须在30分钟之内付款,否则该订单被自动取消。「订单30分钟不付款自动取消,这个任务就是一个延时任务。」 我们的火车票订单取消任务,从需求上看并不需要非常精准的延时,所以是可以使用时间轮算法来完成这个任务的。

首先通过maven坐标引入netty

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.45.Final</version>
</dependency>

然后我们创建一个时间轮,如果是Spring的开发环境,我们可以这么做。下文中我们new了一个包含512个bucket的时间轮,每个时间轮的轮片时间间隔是100毫秒。

@Bean("hashedWheelTimer")
public HashedWheelTimer hashedWheelTimer(){
    return new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
}

举例:当用户买火车票下单的时候,向时间轮中添加一个30分钟的延时任务。延时任务将在30分钟之后被执行,下文的lambda表达式部分实现了一个TimerTask(task)延时任务。这个延时任务的函数体内,请一定使用异步任务,即:单独起一个线程或者使用SpringBoot异步任务线程池。因为Worker线程是单线程的,你的任务处理时间长于tickDuration会妨碍后续时间轮轮片上的任务的执行。

//订单下单操作
void order(String orderInfo) {
  //下单的时候,向时间轮中添加一个30分钟的延时任务
  hashedWheelTimer.newTimeout(task -> {
    //注意这里使用异步任务线程池或者开启线程进行订单取消任务的处理
    cancelOrder(orderInfo);
  }, 30, TimeUnit.MINUTES);
}

三、异步任务线程池

我们在上文中已经多次强调,时间轮的任务TimerTask的执行内容要做成异步的。最简单的做法就是接到一个任务之后启动一个线程处理该任务。在Spring环境下其实我们有更好的选择,就是使用Spring的线程池,这个线程池是可以自定义的。比如:下文中的用法是我事先定义了一个名字为test的线程池,然后通过@Async使用即可。

@Async("test")
public void cancelOrder(String orderInfo){
  //查询订单支付信息,如果用户未支付,关闭订单
}

可能有的朋友,还不知道该如何自定义一个Spring线程池,可以参考:我之前写过一个SpringBoot的**「可观测、易配置」**的线程池开源项目,源代码地址:https://gitee.com/hanxt/zimug-monitor-threadpool。我的这个zimug-monitor-threadpool开源项目,可以做到对线程池使用情况的监控,我自己平时用的效果还不错,向大家推荐一下!

四、时间轮优缺点

时间轮算法实现延时任务的优点就是,相对于使用JDK的DelayQueue,其算法上具有优势,执行性能相对好一些。其缺点就是所有的延时任务以及延时触发的管理,都是在单个应用服务的内存中进行的,一旦该应用服务发生故障重启服务,时间轮任务数据将全部丢失。这一缺点和DelayQueue是一样的。为了解决这个问题,我们可以使用redis、RocketMQ等分布式中间件来管理延时任务消息的方式来实现延时任务,这个我会在后续的文章中为大家介绍。

知识点补充

下面主要和大家一起来分析下Netty时间轮调度算法的原理

时间轮状态

时间轮有以下三种状态:

  • WORKER_STATE_INIT:初始化状态,此时时间轮内的工作线程还没有开启
  • WORKER_STATE_STARTED:运行状态,时间轮内的工作线程已经开启
  • WORKER_STATE_SHUTDOWN:终止状态,时间轮停止工作

状态转换如下:

构造函数

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: "   tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: "   ticksPerWheel);
        }

        // 初始化时间轮数组,时间轮大小为大于等于 ticksPerWheel 的第一个 2 的幂,和 HashMap 类似
        wheel = createWheel(ticksPerWheel);
        // 取模用,用来定位数组中的槽
        mask = wheel.length - 1;

        // 为了保证精度,时间轮内的时间单位为纳秒
        long duration = unit.toNanos(tickDuration);

        // 时间轮内的时钟拨动频率不宜太大也不宜太小
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }

        // 创建工作线程
        workerThread = threadFactory.newThread(worker);


        // 非守护线程且 leakDetection 为 true 时检测内存是否泄漏
        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        // 初始化最大等待任务数
        this.maxPendingTimeouts = maxPendingTimeouts;

        // 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

构造函数中的参数相当重要,当自定义时间轮时,我们应该根据业务的范围设置合理的参数:

  • threadFactory:创建时间轮任务线程的工厂,通过这个工厂可以给我们的线程自定义一些属性(线程名、异常处理等)
  • tickDuration:时钟多长时间拨动一次,值越小,时间轮精度越高
  • unit:tickDuration 的单位
  • ticksPerWheel:时间轮数组大小
  • leakDetection:是否检测内存泄漏
  • maxPendingTimeouts:时间轮内最大等待的任务数

时间轮的时钟拨动时长应该根据业务设置恰当的值,如果设置的过大,可能导致任务触发时间不准确。如果设置的过小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮对象大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

到此这篇关于Java利用Netty时间轮实现延时任务的文章就介绍到这了,更多相关Java Netty时间轮内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Java利用Netty时间轮实现延时任务的更多相关文章

  1. ios – Netty Channel关闭检测

    我正在使用netty和ios构建服务器客户端应用程序,当用户在他/她的ios设备上关闭WiFi时,我遇到问题,netty服务器不了解它.服务器需要知道为该用户进行清理并将其设置为离线状态,但是当用户再次尝试连接时,服务器只是告诉他他/她已经在线.解决方法如果我正确地理解了您的问题:您想要监听服务器端的客户端通道关闭事件,并进行一些会话清理,在Netty有两种方式来收听频道封闭事件:1)如果您的服务

  2. Android MINA vs netty for Android

    在here,MINA和netty之间有一个非常翔实的比较当平台是Android时,我想知道您的偏好!>我有一个主机应该接受连接以及建立与Android设备的连接.>该主机为其操作实现了Boost.ASIO.我需要为android方面选择一个简单的框架.>基于几个小时的谷歌搜索,我,相当新的java,缩小到MINA和netty.两者似乎都很好,虽然netty似乎更容易.>当我读到一些关于在android中使用netty的bug报告时,我感到很困惑.>连接到主机的Android模拟器数量可以增长到很多.所以问

  3. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  4. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  5. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  6. Java实现世界上最快的排序算法Timsort的示例代码

    Timsort 是一个混合、稳定的排序算法,简单来说就是归并排序和二分插入排序算法的混合体,号称世界上最好的排序算法。本文将详解Timsort算法是定义与实现,需要的可以参考一下

  7. Java日期工具类的封装详解

    在日常的开发中,我们难免会对日期格式化,对日期进行计算,对日期进行校验,为了避免重复写这些琐碎的逻辑,我这里封装了一个日期工具类,方便以后使用,直接复制代码到项目中即可使用,需要的可以参考一下

  8. Java设计模式之模板方法模式Template Method Pattern详解

    在我们实际开发中,如果一个方法极其复杂时,如果我们将所有的逻辑写在一个方法中,那维护起来就很困难,要替换某些步骤时都要重新写,这样代码的扩展性就很差,当遇到这种情况就要考虑今天的主角——模板方法模式

  9. Java 中 Class Path 和 Package的使用详解

    这篇文章主要介绍了Java 中 Class Path和Package的使用详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下

  10. java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

    这篇文章主要介绍了java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源),文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部