认识ExecutorCompletionService返回最快的结果

场景: 我们用多线程来做一次搜索,一共有5个搜索引擎可用,我们运行多线程程序,哪个搜索引擎最先返回搜索结果就用哪个结果,剩余的4个结果已经不重要了,可以放弃了。这种场景可用ThreadPoolExecutor.invokeAny方法。

首先看下他的使用方法,见如下代码,代码非常简单,仅需要3分钟就能读完.

使用方法

part1. 任务代码 这个代码的目的是sleep上一个随机的时间

package top.huster.thread.task;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class CallbackTask implements Callable<String>{

    private String name;

    public CallbackTask(String name) {
        this.name = name;
    }

    @Override
    public String call() throws Exception {
        Random random = new Random(System.nanoTime());
        int sleep = random.nextInt(10);
        System.out.println(name + " will sleep " + sleep);
        TimeUnit.SECONDS.sleep(sleep);
        return name + " sleep " + sleep;
    }
}

 

 

part2. main程序,重点

package top.huster.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import top.huster.thread.task.CallbackTask;

public class FirstCompleteTask {

    public static void main(String[] args) {
        ExecutorService executor  = Executors.newCachedThreadPool();
        List<Callable<String>> task = new ArrayList<>();
        task.add(new CallbackTask("Jim"));
        task.add(new CallbackTask("Lucy"));
        task.add(new CallbackTask("Hanmeimei"));
        task.add(new CallbackTask("Jack"));
        try {
            String result = executor.invokeAny(task);
            //result2 = executor.invokeAny(task, 1, TimeUnit.SECONDS);
            System.out.println(result);
        }  catch (InterruptedException e) {
            e.printStackTrace();
        }  catch (ExecutionException e2) {
            e2.printStackTrace();
        }
        executor.shutdown();
    }
}

 

我们传一个List的task给Executor,然后使用invokeAny他会返回最快完成的执行结果,并且不会等待剩余的任务是否完成。

 

值得一提的时候,你还可以设置一个等待时间,超过等待时间后,也不会再等,直接返回null.

然而,本文并没有结束,他是怎么实现的呢?

 

实现分析

这个invokeAny实际上是ExecutorService接口定义的方法,是ThreadPoolExecutor的父类,AbstractExecutorService实现的,这个实现里主要用到了ExecutorCompletionService, 这个类有一个队列用来保存完成的任务,我们看看具体的实现代码,其实很容易理解

 

 /**
     * the main mechanics of invokeAny.
     */
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
           // 先塞一个任务进去
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
               // poll不会阻塞,直接返回
                Future<T> f = ecs.poll();
                if (f == null) {
// 并没有任何完成的任务
                    if (ntasks > 0) {
// 如果任务列表里还有其他任务,继续添加进去
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
// 如果没有active任务,则终止
                        break;
                    else if (timed) {
// 超时了
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
// 否则阻塞等待一个任务完成
                        f = ecs.take();
                }
                if (f != null) {
// 有任务返回了结果
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

 

 

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注