前言

在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题。多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作。在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮助大家理解线程池的核心原理!!!

线程池给我们提供的功能

我们首先来看一个使用线程池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Demo01 {
 
  public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 100; i  ) {
      pool.execute(new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < 100; i  ) {
            System.out.println(Thread.currentThread().getName()   " print "   i);
          }
        }
      });
    }
  }
}

在上面的例子当中,我们使用Executors.newFixedThreadPool去生成来一个固定线程数目的线程池,在上面的代码当中我们是使用5个线程,然后通过execute方法不断的去向线程池当中提交任务,大致流程如下图所示:

线程池通过execute函数不断的往线程池当中的任务队列加入任务,而线程池当中的线程会不断的从任务队列当中取出任务,然后进行执行,然后继续取任务,继续执行....,线程的执行过程如下:

while (true) {
  Runnable runnable = taskQueue.take(); // 从任务队列当中取出任务
  runnable.run(); // 执行任务
}

根据上面所谈到的内容,现在我们的需求很清晰了,首先我们需要有一个队列去存储我们所需要的任务,然后需要开启多个线程不断的去任务队列当中取出任务,然后进行执行,然后重复取任务执行任务的操作。

工具介绍

在我们前面提到的线程池实现的原理当中有一个非常重要的数据结构,就是ArrayBlockingQueue阻塞队列,它是一个并发安全的数据结构,我们首先先简单介绍一下这个数据结构的使用方法。(如果你想深入了解阻塞队列的实现原理,可以参考这篇文章JDK数组阻塞队列源码剖析)

我们主要用的是ArrayBlockingQueue的下面两个方法:

put函数,这个函数是往线程当中加入数据的。我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。

take函数,从队列当中取出数据,但是当队列为空的时候需要将调用这个方法的线程阻塞。当队列当中有数据的时候,就可以从队列当中取出数据。

需要注意的是,如果一个线程被上面两个任何一个线程阻塞之后,可以调用对应线程的interrupt方法终止线程的执行,同时还会抛出一个异常。

下面是一份测试代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
 
public class QueueTest {
 
  public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5); // 队列的容量为5
    Thread thread = new Thread(() -> {
      for (int i = 0; i < 10; i  ) {
        try {
          queue.put(i);
          System.out.println("数据 "   i   "被加入到队列当中");
        } catch (InterruptedException e) {
          System.out.println("出现了中断异常");
          // 如果出现中断异常 则退出 线程就不会一直在 put 方法被挂起了
          return;
        }finally {
        }
      }
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);
    thread.interrupt();
  }
}

上面代码输出结果:

数据 0被加入到队列当中
数据 1被加入到队列当中
数据 2被加入到队列当中
数据 3被加入到队列当中
数据 4被加入到队列当中
出现了中断异常

上面代码的执行顺序是:

线程thread会将0-4这5个数据加入到队列当中,但是在加入第6个数据的时候,阻塞队列已经满了,因此在加入数据的时候线程thread会被阻塞,然后主线程在休息一秒之后中断了线程thread,然后线程thread发生了中断异常,然后被捕获进入catch代码块,然后函数返回,线程thread就不会一直被阻塞了,这一点在我们后面写线程池非常重要!!!

Worker设计

在前文当中我们已经提到了我们的线程需要不断的去任务队列里面取出任务然后执行,我们设计一个Worker类去做这件事!

首先在类当中肯定需要有一个线程池的任务队列,因为worker需要不断的从阻塞队列当中取出任务进行执行。

我们用一个isStopped变量表示线程是否需要终止了,也就是线程池是否需要关闭,如果线程池需要关闭了,那么线程也应该停止了。

我们还需要有一个变量记录执行任务的线程,因为当我们需要关闭线程池的时候需要等待任务队列当中所有的任务执行完成,那么当所有的任务都执行执行完成的时候,队列肯定是空的,而如果这个时候有线程还去取任务,那么肯定会被阻塞,前面已经提到了ArrayBlockingQueue的使用方法了,我们可以使用这个线程的interrupt的方法去中断这个线程的执行,这个线程会出现异常,然后这个线程捕获这个异常就可以退出了,因此我们需要知道对那个线程执行interrupt方法!

Worker实现的代码如下:

import java.util.concurrent.ArrayBlockingQueue;
 
public class Worker implements Runnable {
 
  // 用于保存任务的队列
  private ArrayBlockingQueue<Runnable> tasks;
  // 线程的状态 是否终止
  private volatile boolean isStopped;
 
  // 保存执行 run 方法的线程
  private volatile Thread thisThread;
 
  public Worker(ArrayBlockingQueue<Runnable> tasks) {
    // 这个参数是线程池当中传入的
    this.tasks = tasks;
  }
 
  @Override
  public void run() {
    thisThread = Thread.currentThread();
    while (!isStopped) {
      try {
        Runnable task = tasks.take();
        task.run();
      } catch (InterruptedException e) {
        // do nothing
      }
    }
  }
    // 注意是其他线程调用这个方法 同时需要注意是 thisThread 这个线程在执行上面的 run 方法
  // 其他线程调用 thisThread 的 interrupt 方法之后 thisThread 会出现异常 然后就不会一直阻塞了
  // 会判断 isStopped 是否为 true 如果为 true 的话就可以退出 while 循环了
  public void stop() {
    isStopped = true;
    thisThread.interrupt(); // 中断线程 thisThread
  }
 
  public boolean isStopped(){
    return isStopped;
  }
}

线程池设计

首先线程池需要可以指定有多少个线程,阻塞队列的最大长度,因此我们需要有这两个参数。

线程池肯定需要有一个队列去存放通过submit函数提交的任务。

需要有一个变量存储所有的woker,因为线程池关闭的时候需要将这些worker都停下来,也就是调用worker的stop方法。

需要有一个shutDown函数表示关闭线程池。

需要有一个函数能够停止所有线程的执行,因为关闭线程池就是让所有线程的工作停下来。

线程池实现代码:

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
 
public class MyFixedThreadPool {
  // 用于存储任务的阻塞队列
  private ArrayBlockingQueue<Runnable> taskQueue;
 
  // 保存线程池当中所有的线程
  private ArrayList<Worker> threadLists;
 
  // 线程池是否关闭
  private boolean isShutDown;
 
  // 线程池当中的线程数目
  private int numThread;
 
  public MyFixedThreadPool(int i) {
    this(Runtime.getRuntime().availableProcessors()   1, 1024);
  }
 
  public MyFixedThreadPool(int numThread, int maxTaskNumber) {
    this.numThread = numThread;
    taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 创建阻塞队列
    threadLists = new ArrayList<>();
    // 将所有的 worker 都保存下来
    for (int i = 0; i < numThread; i  ) {
      Worker worker = new Worker(taskQueue);
      threadLists.add(worker);
    }
    for (int i = 0; i < threadLists.size(); i  ) {
      new Thread(threadLists.get(i),
              "ThreadPool-Thread-"   i).start(); // 让worker开始工作
    }
  }
    
  // 停止所有的 worker 这个只在线程池要关闭的时候才会调用
  private void stopAllThread() {
    for (Worker worker : threadLists) {
      worker.stop(); // 调用 worker 的 stop 方法 让正在执行 worker 当中 run 方法的线程停止执行
    }
  }
 
  public void shutDown() {
    // 等待任务队列当中的任务执行完成
    while (taskQueue.size() != 0) {
      // 如果队列当中还有任务 则让出 CPU 的使用权
      Thread.yield();
    }
    // 在所有的任务都被执行完成之后 停止所有线程的执行
    stopAllThread();
  }
 
  public void submit(Runnable runnable) {
    try {
      taskQueue.put(runnable); // 如果任务队列满了, 调用这个方法的线程会被阻塞
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

测试代码:

public class Test {
 
  public static void main(String[] args) {
    MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 开启5个线程 任务队列当中最多只能存1024个任务
    for (int i = 0; i < 1000000; i  ) {
      pool.submit(() -> {
        System.out.println(Thread.currentThread().getName()); // 提交的任务就是打印线程自己的名字
      });
    }
    pool.shutDown();
  }
}

上面的代码是可以正常执行并且结束的,这个输出太长了这里只列出部分输出结果:

ThreadPool-Thread-0
ThreadPool-Thread-4
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-1
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-4

从上面的输出我们可以看见线程池当中只有5个线程,这5个线程在不断从队列当中取出任务然后执行,因为我们可以看到同一个线程的名字输出了多次。

总结

在本篇文章当中主要介绍了线程池的原理,以及我们应该去如何设计一个线程池,同时也介绍了在阻塞队列当中一个非常重要的数据结构ArrayBlockingQueue的使用方法。

线程池当中有一个阻塞队列去存放所有被提交到线程池当中的任务。

所有的Worker会不断的从任务队列当中取出任务然后执行。

线程池的shutDown方法其实比较难思考该怎么实现的,首先在我们真正关闭线程池之前需要将任务队列当中所有的任务执行完成,然后将所有的线程停下来。

在所有的任务执行完成之后,可能有的线程就会阻塞在take方法上(从队列当中取数据的方法,如果队列为空线程会阻塞),好在ArrayBlockingQueue在实现的时候就考虑到了这个问题,只需要其他线程调用了这个被阻塞线程的interrupt方法的话,线程就可以通过捕获异常恢复执行,然后判断isStopped,如果需要停止了就跳出while循环,这样的话我们就可以完成所有线程的停止操作了。

到此这篇关于Java实现手写线程池的示例代码的文章就介绍到这了,更多相关Java线程池内容请搜索Devmax以前的文章或继续浏览下面的相关文章希望大家以后多多支持Devmax!

Java实现手写线程池的示例代码的更多相关文章

  1. Android – 线程池策略和Loader可以用来实现吗?

    )>如果不是什么是创建AsyncTask池的好方法,是否可能正在实施它?

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  4. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  5. Java实现世界上最快的排序算法Timsort的示例代码

    Timsort 是一个混合、稳定的排序算法,简单来说就是归并排序和二分插入排序算法的混合体,号称世界上最好的排序算法。本文将详解Timsort算法是定义与实现,需要的可以参考一下

  6. Java日期工具类的封装详解

    在日常的开发中,我们难免会对日期格式化,对日期进行计算,对日期进行校验,为了避免重复写这些琐碎的逻辑,我这里封装了一个日期工具类,方便以后使用,直接复制代码到项目中即可使用,需要的可以参考一下

  7. Java设计模式之模板方法模式Template Method Pattern详解

    在我们实际开发中,如果一个方法极其复杂时,如果我们将所有的逻辑写在一个方法中,那维护起来就很困难,要替换某些步骤时都要重新写,这样代码的扩展性就很差,当遇到这种情况就要考虑今天的主角——模板方法模式

  8. Java 中 Class Path 和 Package的使用详解

    这篇文章主要介绍了Java 中 Class Path和Package的使用详解,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的朋友可以参考一下

  9. java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源)

    这篇文章主要介绍了java SpringBoot 分布式事务的解决方案(JTA+Atomic+多数据源),文章围绕主题展开详细的内容介绍,具有一定的参考价值,感兴趣的小伙伴可以参考一下

  10. Java一维数组和二维数组元素默认初始化值的判断方式

    这篇文章主要介绍了Java一维数组和二维数组元素默认初始化值的判断方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部