如何知道其他线程是否已经完成?

144

我有一个带有名为StartDownload()方法的对象,它会启动三个线程。

如何在每个线程执行完成时收到通知?

是否有一种方式可以知道一个或所有线程是否已经完成或仍在执行中?


1
请查看Java 5 Barrier类 - Fortyrunner
12个回答

250

你可以通过以下几种方式实现此操作:

  1. 在主线程中使用 Thread.join() 以阻塞的方式等待每个线程完成,或者
  2. 以轮询的方式检查 Thread.isAlive() -- 通常不建议使用 -- 等待每个线程完成,或者
  3. 对于每个相关的线程,调用 setUncaughtExceptionHandler 来调用对象中的方法,并在每个线程完成时编程抛出未捕获的异常,或者
  4. 使用 java.util.concurrent 中的锁、同步器或机制,或者
  5. 更传统的方法是,在主线程中创建一个监听器,然后编程每个线程告诉监听器它们已经完成。

如何实现Idea#5? 首先创建一个接口:

public interface ThreadCompleteListener {
    void notifyOfThreadComplete(final Thread thread);
}

然后创建以下类:

public abstract class NotifyingThread extends Thread {
  private final Set<ThreadCompleteListener> listeners
                   = new CopyOnWriteArraySet<ThreadCompleteListener>();
  public final void addListener(final ThreadCompleteListener listener) {
    listeners.add(listener);
  }
  public final void removeListener(final ThreadCompleteListener listener) {
    listeners.remove(listener);
  }
  private final void notifyListeners() {
    for (ThreadCompleteListener listener : listeners) {
      listener.notifyOfThreadComplete(this);
    }
  }
  @Override
  public final void run() {
    try {
      doRun();
    } finally {
      notifyListeners();
    }
  }
  public abstract void doRun();
}

然后,您的每个线程都将扩展NotifyingThread,而不是实现run(),它将实现doRun()。因此,当它们完成时,它们将自动通知等待通知的任何人。

最后,在您的主类中--启动所有线程的类(或至少等待通知的对象)--修改该类以实现ThreadCompleteListener,并在创建每个线程后立即将其添加到侦听器列表中:

NotifyingThread thread1 = new OneOfYourThreads();
thread1.addListener(this); // add ourselves as a listener
thread1.start();           // Start the Thread

然后,当每个线程退出时,您的notifyOfThreadComplete方法将被调用,其中包含刚完成(或崩溃)的线程实例。
请注意,更好的做法是为NotifyingThread实现Runnable而不是扩展Thread,因为在新代码中通常不建议扩展Thread。但我正在按照您的问题编码。如果您将NotifyingThread类更改为实现Runnable,则必须更改一些管理线程的代码,这很容易做到。

4
使用这种方法,notifiyListeners将在run ()中调用,因此它将在线程内部被调用,并且进一步的调用也将在那里完成,是这样吗? - Jordi Puigdellívol
1
@Eddie Jordi 问,是否可以在 run 方法之后调用 notify 方法,而不是在其中调用。 - Tomasz Dzięcielewski
2
问题实际上是:现在如何从辅助线程中退出。我知道它已经结束了,但现在该如何访问线程? - avalancha
avalancha:这取决于你所说的“离开次要线程”的含义。一旦次要线程的doRun()方法结束,该线程就完成了。它不再运行。另一方面,主线程仍在运行。你到底想做什么?@TomasDz:run()方法完成后无法执行任何操作。一旦该方法完成,线程就不再运行,因此它不能引起任何事件发生。 - Eddie
3
这个线程安全吗?看起来 notifyListeners(因此也包括 notifyOfThreadComplete)将在 NotifyingThread 中被调用,而不是在创建 Listener 的线程中被调用。 - Aaron
显示剩余4条评论

14

使用CyclicBarrier的解决方案

public class Downloader {
  private CyclicBarrier barrier;
  private final static int NUMBER_OF_DOWNLOADING_THREADS;

  private DownloadingThread extends Thread {
    private final String url;
    public DownloadingThread(String url) {
      super();
      this.url = url;
    }
    @Override
    public void run() {
      barrier.await(); // label1
      download(url);
      barrier.await(); // label2
    }
  }
  public void startDownload() {
    // plus one for the main thread of execution
    barrier = new CyclicBarrier(NUMBER_OF_DOWNLOADING_THREADS + 1); // label0
    for (int i = 0; i < NUMBER_OF_DOWNLOADING_THREADS; i++) {
      new DownloadingThread("http://www.flickr.com/someUser/pic" + i + ".jpg").start();
    }
    barrier.await(); // label3
    displayMessage("Please wait...");
    barrier.await(); // label4
    displayMessage("Finished");
  }
}

标签0 - 循环屏障的参与方数等于执行线程数量加上主执行线程(即执行startDownload()的线程)的数量。

标签1 - 第n个下载线程进入等待室。

标签3 - NUMBER_OF_DOWNLOADING_THREADS个下载线程已经进入等待室。主执行线程释放它们以便在相同或几乎相同时刻开始它们的下载任务。

标签4 - 主执行线程进入等待室。这是代码中最棘手的部分。无论哪个线程最后进入等待室并不重要,重要的是最后一个进入等待室的线程确保所有其他下载线程都完成了它们的下载任务。

标签2 - 第n个下载线程已经完成了其下载任务并进入等待室。如果它是最后一个,也就是说已经有NUMBER_OF_DOWNLOADING_THREADS个线程(包括主执行线程)进入了等待室,那么当所有其他线程完成下载时,主执行线程才会继续执行。


9
你应该优先选择使用 java.util.concurrent 解决方案。请查看 Josh Bloch 或 Brian Goetz 的相关内容。
如果你没有使用 java.util.concurrent.*,而是直接使用线程,则应该使用 join() 来知道线程何时完成。这里有一个超级简单的回调机制,请先扩展 Runnable 接口以具备回调功能:
public interface CallbackRunnable extends Runnable {
    public void callback();
}

然后创建一个执行器,它将执行您的可运行对象,并在完成时回调您。
public class CallbackExecutor implements Executor {

    @Override
    public void execute(final Runnable r) {
        final Thread runner = new Thread(r);
        runner.start();
        if ( r instanceof CallbackRunnable ) {
            // create a thread to perform the callback
            Thread callerbacker = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // block until the running thread is done
                        runner.join();
                        ((CallbackRunnable)r).callback();
                    }
                    catch ( InterruptedException e ) {
                        // someone doesn't want us running. ok, maybe we give up.
                    }
                }
            });
            callerbacker.start();
        }
    }

}

另一件需要添加到您的CallbackRunnable接口中的明显事情是处理任何异常的方法,因此可以在其中放置一个public void uncaughtException(Throwable e);行,在您的执行器中安装Thread.UncaughtExceptionHandler将您发送到该接口方法。
但是,如果您的项目允许,执行所有这些操作真的开始像java.util.concurrent.Callable。您应该确实考虑使用java.util.concurrent。

我对于这个回调机制相比于直接调用runner.join()并在之后执行任何代码的优势还有些不太清楚,因为你已经知道线程已经完成了。难道只是因为你可以将该代码定义为可运行对象的属性,所以你可以为不同的可运行对象定义不同的代码吗? - Stephen
2
是的,runner.join() 是最直接的等待方式。我假设 OP 不想阻塞他们的主调用线程,因为他们要求在每个下载完成时“通知”,这提供了一种异步通知的方法。 - broc.seib

4
在过去的6年中,多线程方面发生了许多变化。现在可以使用ExecutorService的invokeAll() API,而不是使用join()和lock API。
1.ExecutorService invokeAll() API
执行给定的任务,并在所有任务完成时返回一个Future列表,其中包含它们的状态和结果。

2.CountDownLatch

一个同步辅助对象,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

CountDownLatch 对象被初始化为一个给定的计数。await 方法会阻塞,直到当前计数由于调用 countDown() 方法而达到零,此后所有等待的线程都将被释放,任何随后的 await 调用都会立即返回。这是一次性现象——计数无法被重置。如果需要一个可以重置计数的版本,请考虑使用 CyclicBarrier。

3.ForkJoinPool 或者在 Executors 中使用 newWorkStealingPool() 也是另一种方式。

4.遍历从 ExecutorService 提交的所有 Future 任务,并使用阻塞调用 get() 检查 Future 对象的状态。

请参阅相关 SE 问题:

如何等待一个生成自己线程的线程?

执行者:如果任务是递归创建的,如何同步等待所有任务完成?


4
您还可以使用 Executors 对象来创建一个 ExecutorService 线程池。然后使用 invokeAll 方法运行每个线程并检索 Futures。这将阻塞直到所有线程都执行完毕。您的另一个选择是使用线程池执行每个线程,然后调用 awaitTermination 阻塞,直到线程池执行完成。只需确保在添加任务完成后调用 shutdown()。

4

如果您希望等待它们完成,则可以使用Join方法。

如果您只是想检查它,还有isAlive属性。


3
注意,如果线程尚未开始执行,则isAlive返回false(即使您自己的线程已经调用了它的start方法)。 - Tom Hawtin - tackline
@TomHawtin-tackline你对此非常确定吗?这将与Java文档 ("如果线程已启动并尚未死亡,则线程是活动的" - https://docs.oracle.com/javase/6/docs/api/java/lang/Thread.html#isAlive%28%29) 相矛盾。这也会与这里的答案相矛盾 (https://dev59.com/uGQm5IYBdhLWcg3w2R5J)。 - Stephen
@Stephen 很久以前我写过这个,但它似乎是真实的。我想它会引起其他人的问题,这些问题在九年前还历历在目。可观察到的内容将取决于实现方式。你告诉一个 Thread 开始运行,线程确实开始了,但调用立即返回。isAlive 应该是一个简单的标志测试,但当我谷歌搜索时,发现这个方法是 native 的。 - Tom Hawtin - tackline

4
你可以使用getState()方法来查询线程实例,它会返回Thread.State枚举的一个实例,其中有以下几个值之一:
*  NEW
  A thread that has not yet started is in this state.
* RUNNABLE
  A thread executing in the Java virtual machine is in this state.
* BLOCKED
  A thread that is blocked waiting for a monitor lock is in this state.
* WAITING
  A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
* TIMED_WAITING
  A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
* TERMINATED
  A thread that has exited is in this state.

然而,我认为更好的设计是有一个主线程等待3个子线程完成,当其他3个线程完成后,主线程将继续执行。


等待3个子进程退出可能不适合使用范式。如果这是一个下载管理器,他们可能想要启动15个下载并从状态栏中删除状态或在下载完成时向用户发出警报,在这种情况下回调函数会更好。 - digitaljoel

3
我想最简单的方法是使用ThreadPoolExecutor类。
  1. 它有一个队列,您可以设置同时工作的线程数。
  2. 它有很好的回调方法:

钩子方法

该类提供了受保护的可重写的beforeExecute(java.lang.Thread, java.lang.Runnable)afterExecute(java.lang.Runnable, java.lang.Throwable)方法,在每个任务执行之前和之后调用。这些可以用于操纵执行环境;例如,重新初始化ThreadLocals、收集统计信息或添加日志条目。此外,可以覆盖方法terminated()以执行任何需要在执行器完全终止后完成的特殊处理。

这正是我们需要的。我们将覆盖afterExecute()以在每个线程完成后获取回调,并将覆盖terminated()以知道何时所有线程都完成。

所以这是你应该做的:

  1. Create an executor:

    private ThreadPoolExecutor executor;
    private int NUMBER_OF_CORES = Runtime.getRuntime().availableProcessors();    
    
    
    
    private void initExecutor() {
    
    executor = new ThreadPoolExecutor(
            NUMBER_OF_CORES * 2,  //core pool size
            NUMBER_OF_CORES * 2, //max pool size
            60L, //keep aive time
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>()
    ) {
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
                //Yet another thread is finished:
                informUiAboutProgress(executor.getCompletedTaskCount(), listOfUrisToProcess.size());
            }
        }
    
    };
    
        @Override
        protected void terminated() {
            super.terminated();
            informUiThatWeAreDone();
        }
    
    }
    
  2. And start your threads:

    private void startTheWork(){
        for (Uri uri : listOfUrisToProcess) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    doSomeHeavyWork(uri);
                }
            });
        }
        executor.shutdown(); //call it when you won't add jobs anymore 
    }
    

在方法informUiThatWeAreDone();内,当所有线程都完成时,可以执行任何需要执行的操作,例如更新UI。

注意: 不要忘记使用synchronized方法,因为您要并行地进行工作,并且如果您决定从另一个synchronized方法调用synchronized方法,则必须非常小心!这经常导致死锁。

希望这可以帮助你!


2
我建议查看Thread类的javadoc。
您有多种线程操作机制:
  • 您的主线程可以串行地join()三个线程,直到所有三个线程完成后才继续执行。

  • 定期轮询已生成的线程的状态。

  • 将所有生成的线程放入单独的ThreadGroup中,并轮询ThreadGroup上的activeCount()并等待其变为0。

  • 设置自定义回调或监听类型的接口以进行线程间通信。

我相信还有许多其他我尚未提及的方法。

1
这里有一个简单、短小、易于理解且完美适用于我的解决方案。我需要在另一个线程结束时向屏幕绘制,但无法实现,因为主线程控制着屏幕。所以:
(1)我创建了全局变量:boolean end1 = false; 当线程结束时,将其设置为true。主线程通过“postDelayed”循环捕获该变量,并进行响应。
(2)我的线程包含:
void myThread() {
    end1 = false;
    new CountDownTimer(((60000, 1000) { // milliseconds for onFinish, onTick
        public void onFinish()
        {
            // do stuff here once at end of time.
            end1 = true; // signal that the thread has ended.
        }
        public void onTick(long millisUntilFinished)
        {
          // do stuff here repeatedly.
        }
    }.start();

}

(3) 幸运的是,“postDelayed”在主线程中运行,因此我们可以每秒检查另一个线程。当另一个线程结束时,我们就可以开始下一步操作。

Handler h1 = new Handler();

private void checkThread() {
   h1.postDelayed(new Runnable() {
      public void run() {
         if (end1)
            // resond to the second thread ending here.
         else
            h1.postDelayed(this, 1000);
      }
   }, 1000);
}

(4) 最后,在代码中的某个地方调用以下代码来启动整个过程:

void startThread()
{
   myThread();
   checkThread();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接