Java执行器:如何在不阻塞的情况下被通知任务完成?

191
假设我有一个充满任务的队列需要提交给执行器服务,我希望它们一个接一个地被处理。我能想到的最简单的方法是:
  1. 从队列中取出一个任务
  2. 将其提交给执行器
  3. 调用返回的Future的.get()方法并阻塞直到结果可用
  4. 从队列中再取出另一个任务...
然而,我试图完全避免阻塞。如果我有10,000个这样的队列,需要逐个处理它们的任务,我会耗尽堆栈空间,因为大多数队列会占用阻塞的线程。
我想要的是提交一个任务,并提供一个回调函数,在任务完成时调用该回调函数。我将使用该回调通知作为发送下一个任务的标志。(functionaljava和jetlang显然使用这些非阻塞算法,但我无法理解他们的代码)
除了编写自己的执行器服务,如何使用JDK的java.util.concurrent实现这一点?
(提供这些任务的队列本身可能会阻塞,但这是以后需要解决的问题)
12个回答

175

定义一个回调接口来接收你想要传递的任何参数,以便在完成通知时调用它。然后在任务结束时调用它。

你甚至可以编写一个通用的Runnable任务包装器,并将其提交给ExecutorService。或者,查看下面介绍的Java 8中内置的一种机制。

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

Java 8加入了CompletableFuture,提供了更丰富的方式来组合异步且有条件完成的流水线。下面是一个虚构但完整的通知示例。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

1
眨眼间就有三个答案!我喜欢CallbackTask,这是一个简单而直接的解决方案。回想起来似乎很明显。谢谢。 关于其他人对SingleThreadedExecutor的评论:我可能有成千上万个队列,每个队列可能有成千上万个任务。它们中的每一个都需要一次处理一个任务,但不同的队列可以并行操作。这就是为什么我使用单个全局线程池的原因。我对执行器还很陌生,请告诉我是否错误。 - Shahbaz
6
好的模式,然而我会使用Guava的可监听Future API,它提供了很好的实现。 - Pierre-Henri
2
@erickson,您能具体说明一下是哪个“Callback”导入吗?这会非常有帮助。因为有很多,很难找到正确的。 - Zelphir Kaltstahl
2
@Zelphir,这是您声明的“回调”接口;不是来自库。现在我可能会使用RunnableConsumerBiConsumer,具体取决于我需要从任务传递给监听器的内容。 - erickson
1
@Bhargav 这是回调函数的典型应用——外部实体“回调”到控制实体。您是否希望创建任务的线程在任务完成之前阻塞?那么在第二个线程上运行任务有什么意义呢?如果允许线程继续,它将需要重复检查某些共享状态(可能是循环,但取决于您的程序),直到它注意到由真正的回调函数所描述的更新(布尔标志、队列中的新项目等)。然后它可以执行一些额外的工作。 - erickson
显示剩余4条评论

64

在Java 8中,您可以使用CompletableFuture。下面是我代码中的一个示例,我在其中使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个GUI应用程序):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

它异步执行。我使用了两个私有方法:mapUsersToUserViewsupdateView


1
一个人如何使用CompletableFuture和执行器一起使用?(以限制并发/并行实例的数量)这是否是一个提示:cf:submitting-futuretasks-to-an-executor-why-does-it-work - user1767316

54

使用Guava的可监听Future API并添加回调函数。参见自网站:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});

26
您可以扩展FutureTask类,并重写done()方法,然后将FutureTask对象添加到ExecutorService中,这样当FutureTask完成时,done()方法会立即被调用。

请问如何将FutureTask对象添加到ExecutorService中?需要执行以下操作:将FutureTask对象添加到ExecutorService中。 - Gary Gauh
@GaryGauh 点击此处了解更多信息。您可以扩展FutureTask,我们可以称之为MyFutureTask。然后使用ExcutorService提交MyFutureTask,然后MyFutureTask的run方法将运行,当MyFutureTask完成时,将调用您的done方法。这里有一些令人困惑的地方是两个FutureTask,实际上MyFutureTask是一个普通的Runnable。 - lin

17

ThreadPoolExecutor还提供了beforeExecuteafterExecute钩子方法,您可以重写并利用它们。以下是从ThreadPoolExecutorJavadocs中的描述。

钩子方法

该类提供了受保护的可重写的beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)方法,在每个任务执行前后都会调用这些方法。这些方法可用于操作执行环境;例如,重新初始化ThreadLocals,收集统计信息或添加日志条目。此外,方法terminated()可以被覆盖,以执行需要在Executor完全终止时完成的任何特殊处理。如果挂钩或回调方法抛出异常,则内部工作线程可能会失败并突然终止。


6
使用CountDownLatch。这是来自java.util.concurrent的工具,可以等待多个线程执行完成后再继续执行。要达到您寻求的回调效果,需要进行一些额外的工作,即在单独的线程中处理此事并使用CountDownLatch等待,然后通知所需的内容。没有原生支持回调或类似效果的功能。 编辑:现在我进一步理解了您的问题,我认为您正在过度扩展。如果您使用常规的SingleThreadExecutor,将所有任务都给它,它会自动排队。

使用SingleThreadExecutor,如何知道所有线程都已完成?我看到一个例子使用while !executor.isTerminated,但这似乎不太优雅。我为每个工作线程实现了回调功能,并增加了计数器,这很有效。 - Bear

5

如果您想确保没有任务同时运行,请使用SingleThreadedExecutor。任务将按照提交的顺序进行处理。您甚至不需要持有这些任务,只需将它们提交给执行程序即可。


3

使用ExecutorService实现Callback机制的简单代码

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

输出:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

重点注意:

  1. If you want process tasks in sequence in FIFO order, replace newFixedThreadPool(5) with newFixedThreadPool(1)
  2. If you want to process next task after analysing the result from callback of previous task,just un-comment below line

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. You can replace newFixedThreadPool() with one of

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    depending on your use case.

  4. If you want to handle callback method asynchronously

    a. Pass a shared ExecutorService or ThreadPoolExecutor to Callable task

    b. Convert your Callable method to Callable/Runnable task

    c. Push callback task to ExecutorService or ThreadPoolExecutor


2
这是对Pache的答案进行扩展,使用Guava的ListenableFuture。具体来说,Futures.transform()返回ListenableFuture,因此可用于链接异步调用。Futures.addCallback()返回void,因此不能用于链接,但适用于处理异步完成的成功/失败情况。
// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

注意:除了链接异步任务外,Futures.transform()还允许您在单独的执行器上安排每个任务(本示例未显示)。


这看起来相当不错。 - kaiser

1
您可以使用Callable的实现方式,如下所示:
public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

CallbackInterface是一些非常基础的东西,比如

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

现在主类的代码将会如下所示:
ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接