从ThreadPoolExecutor的invokeAll方法说开去:如何等待多个并发任务结束并收集结果

ThreadPoolExecutor的invokeAny方法会返回第一个完成的任务的结果,与之对应的是invokeAll方法,返回的是所有任务的执行结果。主线程会阻塞,一直到所有任务都执行完毕。 我们总结下,想要等待所有任务都完成并且开始处理结果到底有几种办法。

方法一、理解shutdown并使用shutdown

要想知道行不行,就得先理解shutdown方法的含义。shutdown被调用之后,会拒绝接受新的submit,另外会将已经submit的所有任务执行完毕,才会结束所有线程,那么主线程才会结束。

用代码说明,首先是一个任务

 

package top.huster.thread.task;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class CallbackTask implements Callable<String>{

    private String name;

    public CallbackTask(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        Random random = new Random(System.nanoTime());
        int sleep = random.nextInt(10);
        System.out.println(name + " will sleep " + sleep);
        TimeUnit.SECONDS.sleep(sleep);
        return name + " sleep " + sleep;
    }
}

 

然后是主程序

 

ExecutorService executorService = Executors.newFixedThreadPool(1);
    List<Future> futureList = new ArrayList<>();
    long start = System.currentTimeMillis();
    for (int i=0; i<2; i++) {
        futureList.add(executorService.submit(new CallbackTask("task" + i)));
    }
    //futureList.add(executorService.submit(new LongTimeCostTask()));
    System.out.println("after submit time cost  " + (System.currentTimeMillis() - start));
    executorService.shutdown();
    System.out.println("after call shutdown time cost  " + (System.currentTimeMillis() - start));
    for (Future f : futureList) {
        try {
            System.out.println(" begin check task time cost  " + (System.currentTimeMillis() - start));
            System.out.println(" is done " + f.isDone());
            System.out.println(" after check task time cost  " + (System.currentTimeMillis() - start));
            /**
            System.out.println(" begin get task result time cost  " + (System.currentTimeMillis() - start));
            System.out.println(" task result is " + f.get());
            System.out.println(" after get task result time cost  " + (System.currentTimeMillis() - start));
             **/
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

这段代码输出如下:

task0 will sleep 2
after submit time cost 5
after call shutdown time cost 6
begin check task time cost 6
is done false
after check task time cost 7
begin check task time cost 7
is done false
after check task time cost 7
task1 will sleep 4

Process finished with exit code 0

这个输出最后等待了4s左右,主线程才结束,这说明,当所有任务完成之后,我们的主线程才退出。尽管在

System.out.println(" after check task time cost " + (System.currentTimeMillis() - start));

执行完上面的代码之后,主线程什么都没做,仅仅是等待所有线程完成。

需要注意的是,我们这里特意设置了线程池里只有一个线程,这说明调用shutdown之后,只是不接受新的任务,旧的任务无论是否已经开始执行,他都会把所有的submit执行完毕,尽管有的任务并未真正的开始。

我们修改下主代码,如果在主代码里加入了Future.get方法呢?输入如下:

 

加入如下代码:

System.out.println(" begin get task result time cost  " + (System.currentTimeMillis() - start));
System.out.println(" task result is " + f.get());
System.out.println(" after get task result time cost  " + (System.currentTimeMillis() - start));

输出结结果为:

task2 will sleep 4
after submit time cost 13
task0 will sleep 4
task1 will sleep 0
task4 will sleep 5
task3 will sleep 7
after call shutdown time cost 14
begin check task time cost 14
is done false
after check task time cost 14
begin get task result time cost 14
task result is task0 sleep 4
after get task result time cost 4017
begin check task time cost 4017
is done true
after check task time cost 4018
begin get task result time cost 4018
task result is task1 sleep 0
after get task result time cost 4018
begin check task time cost 4018
is done true
after check task time cost 4018
begin get task result time cost 4018
task result is task2 sleep 4
after get task result time cost 4018
begin check task time cost 4018
is done false
after check task time cost 4018
begin get task result time cost 4018
task result is task3 sleep 7
after get task result time cost 7017
begin check task time cost 7017
is done true
after check task time cost 7018
begin get task result time cost 7018
task result is task4 sleep 5
after get task result time cost 7018

Process finished with exit code 0

 

 

这个也符合我们的预期,因为Futuer.get方法会阻塞当前线程,一直等待当前任务完成,所以当经过了7s后,后面的任务都已经执行完毕了,也就不耗时了。

 

shutdown和shutdownNow的区别

shutdownNow和shutdown的区别是,shutdownNow会立即终止所有当前正在运行的线程,并且将等待运行的线程取消掉(假设任务比线程数多得多)。需要注意的是,他试图去终止线程调用的是线程的interrupt方法,这个方法能不能终止线程,则不一定。

 

Thread的interrupt方法

需要注意的是,调用Thread的interrupt的方法,仅仅是将Thread的interrupted标志位置为true,至于线程是否会终止,则要看业务方怎么写了。如果业务方监控了interrupted状态位,并且一旦发现状态为为ture的时候,及时的终止线程,这个线程才会真正的终止,否则,即使你调用了这个方法,线程该干嘛还是干嘛。

另外需要注意的是,如果当前线程处于等待中(Sleep,wait等),那么调用这个方法会立即触发一个InterruptedException, 止于此时线程会发生什么,还是要看业务方捕获了这个异常之后干了什么。

测试代码如下,我们加一个耗时较长的任务,这个任务会sleep一段时间,然后写入文件。

package top.huster.thread.task;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class LongTimeCostTask implements Callable<String> {
    @Override
    public String call() throws IOException {
        try {
            TimeUnit.SECONDS.sleep(30);
            File file = new File("/tmp/task");
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            fileOutputStream.write("time now is ".getBytes("utf-8"));
        } catch (InterruptedException e) {
            File file = new File("/tmp/exception");
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            fileOutputStream.write(e.getMessage().getBytes("utf-8"));
        }
        return "long time task";
    }
}

 

 

把这个长耗时任务加入任务执行队列,当调用shutdown方法之后,他会等待30s才结束主线程,并且/tmp目录下会有一个task文件

而如果调用的是shutdownNow,如你所愿会有一个/tmp/exception文件,内容是:sleep interrupted

 

利用awaitTermination(long timeOut, TimeUnit unit)方法

其实有了shutdown方法,我们已经能够让线程执行完所有任务并且停止了。但是,shutdown之后,线程的结束时间取决于最长耗时的那个任务。如果你的任务仅仅是让所有提交的任务执行完毕就可以了(任务没有关联,结束一个处理一个),那么调用shutdown完全够用。如果你的任务一定要等待所有的任务完成之后(一定要等待所有的任务完成,再去处理返回结果),再去做某个事情,那就要结合awaitTermination方法了。

awaitTermination可以每隔一段时间就去检测是否所有的任务已完成,如果完成了则返回true,否则返回false。

 

  System.out.println("after submit time cost  " + (System.currentTimeMillis() - start));
        executorService.shutdown();
        System.out.println("after call shutdown time cost  " + (System.currentTimeMillis() - start));
        try {
            while (executorService.awaitTermination(1, TimeUnit.SECONDS)) {
                //do something
            }
        } catch (InterruptedException e) {

        }

 

ThreadPoolExecutor的invokeAll方法

最后一个方法当然是调用invokeAll方法了,这个方法会阻塞当前线程直到所有的任务执行完毕,代码省略。

 

 

总结下就是:

  1. shutdown和shutdownNow的区别是,前者一定会等所有任务执行完毕,才会停止主线程,后者是立即停止并且会向线程发送一个interrupt信号。
  2. 线程被调用interrupt方法之后,是否中断要看业务方怎么处理
  3. Future的get方法调用之后,会阻塞住,直到任务完毕这个方法才会返回
  4. 当任务一个个处理的时候,shutdown就足够用了。而如果非要等到所有任务完成才做某件事情的话,需要结合awaitTermination每隔一段时间去检测状态位。
  5. 还能使用ThreadPoolExecutor的invokeAll方法来等待所有任务完成,当这个方法返回之后,所有future的isDone都是true.

 

 

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注