等待多个异步任务完成

17

我正在通过将操作分割为可用核心数量的确切部分,然后启动相同数量的AsyncTask,在不同的数据部分上执行相同的操作来并行化我的操作。

我正在使用executeOnExecutor(AsyncTask.THREAD_POOL_EXECUTOR, ...)来并行执行它们。

我想知道每个线程何时完成其工作,以便组合所有结果并执行进一步的操作。

我该怎么做?


如果你想等待,那么我更喜欢使用线程,因为你可以简单地加入一个线程。使用异步的整个重点是它可以并行执行。当然,你可以通过使用异步的 Future 来等待结果。 - We are Borg
你是否能够控制AsyncTask并提前知道任务数量?如果是这样,你可以在onPostExecute()中简单地计数(整数变量,无需复杂的同步)完成的任务数,并在计数达到零时执行操作。 - dhke
@dhke,在这种情况下,您必须运行一个空闲循环,而我不太喜欢这样。 - Nicholas Allio
@NicholasAllio 这是指我的已删除评论吗?如果是的话:是的,我从你在答案中的评论中意识到你想要收到通知,而不是等待。 - dhke
使用 Rx 合并操作符 - Duna
采用这种方法会引发两个问题:如果出现问题会发生什么?AsyncTask能够处理错误吗? - Duna
5个回答

13
您还可以在onPostExecute中的共享对象中递减计数器。由于onPostExecute在同一线程(即主线程)上运行,因此不必担心同步问题。 更新1 共享对象可能如下所示:
public class WorkCounter {
    private int runningTasks;
    private final Context ctx;

    public WorkCounter(int numberOfTasks, Context ctx) {
        this.runningTasks = numberOfTasks;
        this.ctx = ctx;
    }
    // Only call this in onPostExecute! (or add synchronized to method declaration)
    public void taskFinished() {
        if (--runningTasks == 0) {
            LocalBroadcastManager mgr = LocalBroadcastManager.getInstance(this.ctx);
            mgr.sendBroadcast(new Intent("all_tasks_have_finished"));
        }
    }
}

更新 2

根据这个回答的评论,OP正在寻找一种避免构建新类的解决方案。这可以通过在生成的 AsyncTask 之间共享一个 AtomicInteger 来实现:

// TODO Update type params according to your needs.
public class MyAsyncTask extends AsyncTask<Void,Void,Void> {
    // This instance should be created before creating your async tasks.
    // Its start count should be equal to the number of async tasks that you will spawn.
    // It is important that the same AtomicInteger is supplied to all the spawned async tasks such that they share the same work counter.
    private final AtomicInteger workCounter;

    public MyAsyncTask(AtomicInteger workCounter) {
        this.workCounter = workCounter;
    }

    // TODO implement doInBackground

    @Override
    public void onPostExecute(Void result) {
        // Job is done, decrement the work counter.
        int tasksLeft = this.workCounter.decrementAndGet();
        // If the count has reached zero, all async tasks have finished.
        if (tasksLeft == 0) {
            // Make activity aware by sending a broadcast.
            LocalBroadcastManager mgr = LocalBroadcastManager.getInstance(this.ctx);
            mgr.sendBroadcast(new Intent("all_tasks_have_finished"));    
        }
    }
}

我认为以这种方式,如果多个线程在同一时间完成,可能会出现多个线程对同一变量进行多次访问和修改的问题。 - Nicholas Allio
2
不会,因为onPostExecute在主线程上运行。只是不要在doInBackground中访问该变量。 - Janus Varmarken
那么你的建议是让该活动在空闲循环中等待直到计数器归零?不确定这是否是最佳解决方案。 - Nicholas Allio
你所说的“更方便”是什么意思?内置功能?附加拟议类的示例。在我看来,它很简单。 - Janus Varmarken
我明白了。我有一个备选方案,你可能更喜欢(个人而言,我更喜欢已发布的方案)。我会用另一种策略更新答案,请给我几分钟时间。 - Janus Varmarken
显示剩余6条评论

6

您应该使用CountDownLatch。以下是带有示例的文档: java.util.concurrent.CountDownLatch

基本上,您将CountDownLatch的引用分配给您的线程,每个线程在完成时都会将其递减:

countDownLatch.countDown();

主线程将通过以下方式等待所有线程的终止:

countDownLatch.await();

在CountDownLatch并发访问的情况下,它的行为是什么? - Nicholas Allio
它是线程安全的,因为CountDownLatch是一种同步工具。 - Pouriya Zarbafian
这个解决方案对我来说非常有趣,但问题在于只有在没有要传递给您的AsyncTask参数时才能使用它(在我的情况下,我向我的任务传递了几个整数,而我不能同时传递CountDownLatch并在onPostExecute中引用它不起作用。 - Nicholas Allio
也许你可以扩展AsyncTask并创建一个带有额外参数的构造函数,然后通过调用'super.onPostExecute()'和'countDownLatch.countDown()'来重写onPostExecute。 - Pouriya Zarbafian
1
“_主线程_将使用countDownLatch.await()等待所有线程终止。您实际上需要专门为此await调用分配一个单独的线程,而不是主线程,否则您将锁定GUI(从而打败了AsyncTask的目的,即在主线程之外执行短暂的后台作业)。在超越await调用之后,这个专门用于等待的线程将通过发送广播或发布到与主线程关联的Handler来向主线程发出完成作业的信号。” - Janus Varmarken

4
首先,将此类添加到您的项目中。
public abstract class MultiTaskHandler {
    private int mTasksLeft;
    private boolean mIsCanceled = false;

    public MultiTaskHandler(int numOfTasks) {
        mTasksLeft = numOfTasks;
    }

    protected abstract void onAllTasksCompleted();

    public void taskComplete()  {
        mTasksLeft--;
        if (mTasksLeft==0 && !mIsCanceled) {
            onAllTasksCompleted();
        }
    }

    public void reset(int numOfTasks) {
        mTasksLeft = numOfTasks;
        mIsCanceled=false;
    }

    public void cancel() {
        mIsCanceled = true;
    }
}

然后:
int totalNumOfTasks = 2; //change this to the number of tasks that you are running
final MultiTaskHandler multiTaskHandler = new MultiTaskHandler(totalNumOfTasks) {
    @Override
    protected void onAllTasksCompleted() {
       //put the code that runs when all the tasks are complete here
    }
};

然后在每个任务完成后,添加以下语句:multiTaskHandler.taskComplete();

例如:

(new AsyncTask<Void,Void,Void>() {

    @Override
    protected Void doInBackground(Void... voids) {
        // do something...
        return null;
    }

    @Override
    protected void onPostExecute(Void aVoid) {
        multiTaskHandler.taskComplete();
    }
}).execute();

如果您想取消所有任务完成时运行的代码,可以使用 multiTaskHandler.cancel()。例如 - 如果您有一个错误(不要忘记还要取消所有其他任务)。

* 此解决方案不会暂停主线程!


1
这个解决方案简直完美。 - MurifoX
非常适合我的个人问题,我根据自己的需求进行了调整。点赞! - Bogdan Android

0

RX Merge 操作符是你的好朋友。

摆脱 AsyncTark,它比 RX 更慢,而且你无法处理错误。


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接