场景: 我们用多线程来做一次搜索,一共有5个搜索引擎可用,我们运行多线程程序,哪个搜索引擎最先返回搜索结果就用哪个结果,剩余的4个结果已经不重要了,可以放弃了。这种场景可用ThreadPoolExecutor.invokeAny方法。
首先看下他的使用方法,见如下代码,代码非常简单,仅需要3分钟就能读完.
使用方法
part1. 任务代码 这个代码的目的是sleep上一个随机的时间
package top.huster.thread.task; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public class CallbackTask implements Callable<String>{ private String name; public CallbackTask(String name) { this.name = name; } @Override public String call() throws Exception { Random random = new Random(System.nanoTime()); int sleep = random.nextInt(10); System.out.println(name + " will sleep " + sleep); TimeUnit.SECONDS.sleep(sleep); return name + " sleep " + sleep; } }
part2. main程序,重点
package top.huster.thread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import top.huster.thread.task.CallbackTask; public class FirstCompleteTask { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); List<Callable<String>> task = new ArrayList<>(); task.add(new CallbackTask("Jim")); task.add(new CallbackTask("Lucy")); task.add(new CallbackTask("Hanmeimei")); task.add(new CallbackTask("Jack")); try { String result = executor.invokeAny(task); //result2 = executor.invokeAny(task, 1, TimeUnit.SECONDS); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e2) { e2.printStackTrace(); } executor.shutdown(); } }
我们传一个List的task给Executor,然后使用invokeAny他会返回最快完成的执行结果,并且不会等待剩余的任务是否完成。
值得一提的时候,你还可以设置一个等待时间,超过等待时间后,也不会再等,直接返回null.
然而,本文并没有结束,他是怎么实现的呢?
实现分析
这个invokeAny实际上是ExecutorService接口定义的方法,是ThreadPoolExecutor的父类,AbstractExecutorService实现的,这个实现里主要用到了ExecutorCompletionService, 这个类有一个队列用来保存完成的任务,我们看看具体的实现代码,其实很容易理解
/** * the main mechanics of invokeAny. */ private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); // For efficiency, especially in executors with limited // parallelism, check to see if previously submitted tasks are // done before submitting more of them. This interleaving // plus the exception mechanics account for messiness of main // loop. try { // Record exceptions so that if we fail to obtain any // result, we can throw the last exception we got. ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); // Start one task for sure; the rest incrementally // 先塞一个任务进去 futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { // poll不会阻塞,直接返回 Future<T> f = ecs.poll(); if (f == null) { // 并没有任何完成的任务 if (ntasks > 0) { // 如果任务列表里还有其他任务,继续添加进去 --ntasks; futures.add(ecs.submit(it.next())); ++active; } else if (active == 0) // 如果没有active任务,则终止 break; else if (timed) { // 超时了 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else // 否则阻塞等待一个任务完成 f = ecs.take(); } if (f != null) { // 有任务返回了结果 --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); throw ee; } finally { for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }