如何覆盖executorService的shutdown方法

4
我正在创建自己的线程池和未来对象,可以并行执行可调用接口。Executor提供了shutdown方法来停止所有工作线程的运行。如果我像下面这样创建一个线程池,我应该如何实现shutdown方法以在所有线程执行完毕后停止?
我的自定义线程池看起来像this
class MyThreadPool implements java.util.concurrent.Executor 
{
    private final java.util.concurrent.BlockingQueue<Callable> queue;

    public MyThreadPool(int numThreads) {
        queue = new java.util.concurrent.LinkedBlockingQueue<>();
        for (int i=0 ; i<numThreads ; i++) {
            new Thread(new Runnable(){
                @Override
                public void run() {
                    while(true) {
                        queue.take().call();
                    }
                }
            }).start();
        }
    }

  @Override
  public <T> Future<T> submit(Callable<T> callable) {
    FutureTask<T> future = new FutureTask(callable);
    queue.put(future);
    return future;
  }

  public void shutdown(){ }
}

我想不到一种方法来保持线程列表,然后检查它们是否空闲?

1个回答

1
你一定要持有你创建的线程的引用。例如,设置一个类型为List<Thread>的字段threads,并在构造函数中将线程添加到此列表中。
之后,你可以借助Thread#join()实现shutdown()
public void shutdown() {
    for (Thread t : threads) {
        try {
            t.join();
        } catch (InterruptedException e) { /* NOP */ }
    }
}

不要忘记用适当的条件替换while (true)(在shutdown()中切换),考虑使用{{link1:BlockingQueue#poll(long, TimeUnit)}}而不是take()

编辑:类似于:

public class MyThreadPool implements Executor {

    private List<Thread> threads = new ArrayList<>();
    private BlockingDeque<Callable> tasks = new LinkedBlockingDeque<>();
    private volatile boolean running = true;

    public MyThreadPool(int numberOfThreads) {
        for (int i = 0; i < numberOfThreads; i++) {
            Thread t = new Thread(() -> {
                while (running) {
                    try {
                        Callable c = tasks.poll(5L, TimeUnit.SECONDS);
                        if (c != null) {
                            c.call();
                        }
                    } catch (Exception e) { /* NOP */ }
                }
            });
            t.start();
            threads.add(t);
        }
    }

    public void shutdown() {
        running = false;
        for (Thread t : threads) {
            try {
                t.join();
            } catch (InterruptedException e) { /* NOP */ }
        }
    }

    // ...

}

我相信原因是线程在等待从队列中获取可调用对象,因此它不会终止。如果我错了,请纠正我。 - krs8888
是的,这个方法可以工作,尽管我不想为我的队列轮询。但是我在关闭方法中使用了毒丸方法。我的做法是在关闭方法中添加一个毒丸对象到队列中,当我从队列中取出它时,我会检查它是否是毒丸。如果是,我就跳出循环。 - krs8888
@krs8888 很高兴能够帮到你。如果你的问题已经解决,请随意接受答案。 - beatngu13

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接