Java ExecutorService的invokeAll()方法中断问题

7
我有一个固定线程池的ExecutorService,宽度为10,并且有一个包含100个Callable的列表,每个Callable等待20秒并记录它们的中断。
我在另一个线程上调用该列表的invokeAll方法,并几乎立即中断该线程。如预期,ExecutorService的执行被中断,但是Callable实际记录到的中断次数远远超过了预期的10次 - 大约是20-40次。如果ExecutorService无法同时执行超过10个线程,那么为什么会这样呢?
完整源代码:(由于并发可能需要运行多次)
@Test
public void interrupt3() throws Exception{
    int callableNum = 100;
    int executorThreadNum = 10;
    final AtomicInteger interruptCounter = new AtomicInteger(0);
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
    for (int i = 0; i < callableNum; ++i) {
        executeds.add(new Waiter(interruptCounter));
    }
    Thread watcher = new Thread(new Runnable() {

        @Override
        public void run(){
            try {
                executorService.invokeAll(executeds);
            } catch(InterruptedException ex) {
                // NOOP
            }
        }
    });
    watcher.start();
    Thread.sleep(200);
    watcher.interrupt();
    Thread.sleep(200);
    assertEquals(10, interruptCounter.get());
}

// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
    private AtomicInteger    interruptCounter;

    public Waiter(AtomicInteger interruptCounter){
        this.interruptCounter = interruptCounter;
    }

    @Override
    public Object call() throws Exception{
        try {
            Thread.sleep(20000);
        } catch(InterruptedException ex) {
            interruptCounter.getAndIncrement();
        }
        return null;
    }
}

使用WinXP 32位操作系统,Oracle JRE 1.6.0_27和JUnit4。


哦...将其转换为一个带有主方法的程序后,我总是得到10...(在Windows上使用Java 7) - Jon Skeet
测试了相同的代码,在1.6.0_27版本的Windows XP系统上得到了37的结果。没有Java 7来进行测试,有人可以确认一下吗? - Alex Abdugafarov
我会在工作中尝试。也许这是Java 6的一个bug... - Jon Skeet
我也一直在这里得到10个(1.6.0_24,Windows 7 64位)。从初步的看法来看,您的逻辑似乎是正确的(但是再说一遍,这是一个并发问题,所以可能不算太多),因此我同意您期望看到10个中断。 - Andrzej Doyle
3个回答

4

我不同意只接收10个中断的假设。

Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS   (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS   (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....

我的观点是,不能保证监视器线程在必须让出时间片并允许ExecutorService的工作线程开始更多Waiter任务之前,能够取消所有100个“Waiter”RunnableFuture实例。

更新:展示来自AbstractExecutorService的代码

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get(); //If interrupted, this is where the InterruptedException will be thrown from
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
    }
}

包含 f.cancel(true) 的 finally 块是中断会传播到当前正在运行的任务的地方。正如您所看到的,这是一个紧密的循环,但不能保证执行循环的线程能够在一个时间片内迭代遍历所有 Future 实例。


所以你说,invokeAll() 方法的中断并不意味着在中断正在运行的任务之前立即取消所有排队任务?对我来说,这看起来就像是最小惊讶原则的直接破坏。 - Alex Abdugafarov
1
正确。处理任务的工作线程与执行invokeAll()的线程是不同的。在一个线程上调用中断并不意味着其他线程也应该被中断,所以我不会对偶尔收到超过10个工作线程中断而感到惊讶。正如我在发布的带注释代码中提到的那样,只有通过传递给Future.cancel方法的布尔参数才会向处理任务的工作线程发送中断。 - Tim Bender

1
如果您想实现相同的行为
    ArrayList<Runnable> runnables = new ArrayList<Runnable>();
    executorService.getQueue().drainTo(runnables);

在中断线程池之前添加此块。
它将把所有等待队列排放到新列表中。
因此,它只会中断正在运行的线程。

0
PowerMock.mockStatic ( Executors.class );
EasyMock.expect ( Executors.newFixedThreadPool ( 9 ) ).andReturn ( executorService );

Future<MyObject> callableMock = (Future<MyObject>) 
EasyMock.createMock ( Future.class );
EasyMock.expect ( callableMock.get ( EasyMock.anyLong (), EasyMock.isA ( TimeUnit.class ) ) ).andReturn ( ccs ).anyTimes ();

List<Future<MyObject>> futures = new ArrayList<Future<MyObject>> ();
futures.add ( callableMock );
EasyMock.expect ( executorService.invokeAll ( EasyMock.isA ( List.class ) ) ).andReturn ( futures ).anyTimes ();

executorService.shutdown ();
EasyMock.expectLastCall ().anyTimes ();

EasyMock.expect ( mock.getMethodCall ( ) ).andReturn ( result ).anyTimes ();

PowerMock.replayAll ();
EasyMock.replay ( callableMock, executorService, mock );

Assert.assertEquals ( " ", answer.get ( 0 ) );
PowerMock.verifyAll ();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接