我有一个需要并行计算许多小任务的过程,然后按照任务的自然顺序处理结果.为此,我有以下设置:

一个简单的ExecutorService和一个阻塞队列,我将使用它来保持在将Callable提交给执行程序时返回的Future对象:

ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

一些调试代码用于计算已提交的数量和已处理任务的数量,并定期将其写出(请注意,处理在任务代码本身的末尾会增加):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
},60 * 1000,60 * 1000);

从队列(实际上是生成器)获取任务并将它们提交给执行程序的线程,将生成的Future放入期货队列中(这就是我确保不提交太多任务的内存耗尽的方法) :

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception",e);}
},"SubmitTasks");
submitThread.start();

然后当前线程从期货队列中完成任务并处理结果:

while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

当我在具有8个内核的服务器上运行它时(注意代码当前使用15个线程),cpu利用率仅达到约60%.我看到我的调试输出如下:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

线程转储显示许多线程池线程阻塞如下:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.reentrantlock$Nonfairsync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.reentrantlock.lockInterruptibly(reentrantlock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

我对调试输出的解释是,在任何给定的时间点,我至少有几百个已提交给执行程序服务但尚未处理的任务(我还可以在堆栈跟踪中确认SubmitTasks线程是在LinkedBlockingQueue.put上被阻止).然而,堆栈跟踪(以及服务器利用率统计信息)向我显示Executor服务在LinkedBlockingQueue.take上被阻止(我假设内部任务队列为空).

我读错了什么?

解决方法

涉及BlockingQueues的线程总是很棘手.只需查看代码而无需使用您所使用的比例运行.我有一些建议.像Jessica Kerr这样的业内许多专家建议你永远不要永远阻止.你可以做的是在LinkedBlockingQueue中使用带有超时的方法.
Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.peek()) != null) {
            boolean success = futures.offer(exec.submit(task),1000,TimeUnit.MILLISECONDS);
            if(success) {
                submitted.incrementAndGet();
                taskQueue.remove(task);
            }
        }
    } catch (Exception e) {l .error("Unexpected Exception","SubmitTasks");
submitThread.start();

还有这里.

while (!futures.isEmpty() || submitThread.isAlive()) {
    Future<MyTask> f = futures.poll(1000,TimeUnit.MILLISECONDS);
    if(f != null) {
        MyTask task = f.get();
    }
    //process result
}

观看此视频由Jessica Kerr于Concurrency tools in JVM

java – 修复线程池线程阻塞,当提交足够的任务时的更多相关文章

  1. ios – 何时使用Semaphore而不是Dispatch Group?

    我会假设我知道如何使用DispatchGroup,为了解问题,我尝试过:结果–预期–是:为了使用信号量,我实现了:并在viewDidLoad方法中调用它.结果是:从概念上讲,dispachGroup和Semaphore都有同样的目的.老实说,我不熟悉:什么时候使用信号量,尤其是在与dispachGroup合作时–可能–处理问题.我错过了什么部分?

  2. ios – 当我的主线程阻塞时,如何获得断点/日志/增加的可见性?

    在对UI响应的永无止境的追求中,我想更多地了解主线程执行阻止操作的情况.我正在寻找某种“调试模式”或额外的代码,或钩子,或任何东西,从而我可以设置一个断点/日志/将被击中的东西,并允许我检查如果我的主要线程“自愿”用于I/O的块(或任何原因,真的),除了在循环结束时空闲.在过去,我已经使用循环观察器观察了跑步循环的时钟周期,这对于查看问题很有价值,但是在你可以检查的时候,为了做一个好主意,为时已晚

  3. ios – 如何在不阻塞主线程的情况下添加SCNNode?

    解决方法我不认为使用dispatchQueue可以解决这个问题.如果我替换其他任务而不是创建SCNNode它按预期工作,所以我认为问题与SceneKit有关.thisquestion的答案表明SceneKit有自己的私有后台线程,它将所有更改批量化.因此,无论我使用什么线程来创建我的SCNNode,它们都会在与渲染循环相同的线程中的同一队列中结束.我正在使用的丑陋的解决方法是在SceneKit的委托渲染器方法中一次添加一些节点,直到它们全部完成.

  4. 在Swift中应用Grand Central Dispatch(上

    在这两篇教程中,你会学到GCD的来龙去脉。起步libdispatch是Apple所提供的在IOS和OSX上进行并发编程的库,而GCD正是它市场化的名字。Swift中的闭包和OC中的块类似甚至于他们几乎就是可交换使用的。但OC中的块可以安全的替换成Swift中的闭包。再一次,这完全取决于GCD。QoS等级表示了提交任务的意图,使得GCD可以决定如何制定优先级。QOS_CLASS_USER_INteraCTIVE:userinteractive等级表示任务需要被立即执行以提供好的用户体验。

  5. 在Swift中应用Grand Central Dispatch 下

    通过使用dispatch_barrrier和dispatch_sync,你做到了让PhotoManager单例在读写照片时是线程安全的。还有,使用dispatch_async异步执行cpu密集型任务,从而为视图控制器初始化过程减负。幸运的是,dispatchgroups就是专为监视多个异步任务的完成情况而设计的。调度组调度组在一组任务都完成后会发出通知。在组内所有事件都完成时,GCDAPI提供了两种方式发送通知。打开PhotoManager.swift,替换downloadPhotosWithComple

  6. swift详解之十六-----------GCD基础部分

    当你了解了调度队列如何为你自己代码的不同部分提供线程安全后,GCD的优点就是显而易见的。这完全取决于GCD。这个队列就是用于发生消息给UIView或发送通知的。GCD的“艺术”归结为选择合适的队列来调度函数以提交你的工作。

  7. Realm Swift

    一旦带有主键的对象被添加到Realm之后,该对象的主键将不可修改。IgnoredProperties重写Object.ignoredProperties()可以防止Realm存储数据模型的某个属性。Realm将不会干涉这些属性的常规操作,它们将由成员变量提供支持,并且您能够轻易重写它们的setter和getter。所有的查询在Realm中都是延迟加载的,只有当属性被访问时,才能够读取相应的数据。

  8. 同步和异步

    如果是同步操作,它会阻塞当前线程并等待Block中的任务执行完毕,然后当前线程才会继续往下运行。并行队列中的任务根据同步或异步有不同的执行方式。同步执行异步执行串行队列当前线程,一个一个执行其他线程,一个一个执行并行队列当前线程,一个一个执行开很多线程,一起执行创建队列:主队列:这是一个特殊的串行队列。传入disPATCH_QUEUE_CONCURRENT表示创建并行队列。

  9. Swift--&gt;GCD,NSThread,NSBlockOperation多线程使用(主线程回调)

    应用程序开发,少不了的多线程,与多线程相关的就是线程同步.本文介绍Swift最简单的多线程使用.推荐阅读:http://www.jianshu.com/p/0b0d9b1f1f19看例子:1:获取线程基本的信息2:子线程的创建方法3:GCD(GrandCentraldispatch)队列的使用4:自定义queue5:NSBlockOperation和NSOperationQueue的使用6:子线程

  10. 完整详解swift GCD系列四dispatch_semaphore信号量

    viewmode=contents一何为信号量?简单来说就是控制访问资源的数量,比如系统有两个资源可以被利用,同时有三个线程要访问,只能允许两个线程访问,第三个应当等待资源被释放后再访问。其中value为信号量的初值,如果小于0则会返回NULL提高信号量copy

随机推荐

  1. 基于EJB技术的商务预订系统的开发

    用EJB结构开发的应用程序是可伸缩的、事务型的、多用户安全的。总的来说,EJB是一个组件事务监控的标准服务器端的组件模型。基于EJB技术的系统结构模型EJB结构是一个服务端组件结构,是一个层次性结构,其结构模型如图1所示。图2:商务预订系统的构架EntityBean是为了现实世界的对象建造的模型,这些对象通常是数据库的一些持久记录。

  2. Java利用POI实现导入导出Excel表格

    这篇文章主要为大家详细介绍了Java利用POI实现导入导出Excel表格,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  3. Mybatis分页插件PageHelper手写实现示例

    这篇文章主要为大家介绍了Mybatis分页插件PageHelper手写实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  4. (jsp/html)网页上嵌入播放器(常用播放器代码整理)

    网页上嵌入播放器,只要在HTML上添加以上代码就OK了,下面整理了一些常用的播放器代码,总有一款适合你,感兴趣的朋友可以参考下哈,希望对你有所帮助

  5. Java 阻塞队列BlockingQueue详解

    本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景,通过实例代码介绍了Java 阻塞队列BlockingQueue的相关知识,需要的朋友可以参考下

  6. Java异常Exception详细讲解

    异常就是不正常,比如当我们身体出现了异常我们会根据身体情况选择喝开水、吃药、看病、等 异常处理方法。 java异常处理机制是我们java语言使用异常处理机制为程序提供了错误处理的能力,程序出现的错误,程序可以安全的退出,以保证程序正常的运行等

  7. Java Bean 作用域及它的几种类型介绍

    这篇文章主要介绍了Java Bean作用域及它的几种类型介绍,Spring框架作为一个管理Bean的IoC容器,那么Bean自然是Spring中的重要资源了,那Bean的作用域又是什么,接下来我们一起进入文章详细学习吧

  8. 面试突击之跨域问题的解决方案详解

    跨域问题本质是浏览器的一种保护机制,它的初衷是为了保证用户的安全,防止恶意网站窃取数据。那怎么解决这个问题呢?接下来我们一起来看

  9. Mybatis-Plus接口BaseMapper与Services使用详解

    这篇文章主要为大家介绍了Mybatis-Plus接口BaseMapper与Services使用详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

  10. mybatis-plus雪花算法增强idworker的实现

    今天聊聊在mybatis-plus中引入分布式ID生成框架idworker,进一步增强实现生成分布式唯一ID,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

返回
顶部