SwingWorker,在 process() 调用完成之前,done() 就已经执行

20

我使用SwingWorker工作已经有一段时间了,但是我遇到了一个奇怪的行为,至少对我来说很奇怪。我清楚地理解,由于性能原因,publish()方法被多次调用时会合并为一个调用。这对我来说完全合理,我怀疑SwingWorker保留某种类型的队列以处理所有这些调用。

根据教程和API,在SwingWorker结束其执行时,无论是doInBackground()正常完成还是工作线程从外部取消,都会调用done()方法。到目前为止,一切都很好。

但是我有一个示例(类似于教程中显示的示例),其中在执行 done()方法之后进行了 process()方法调用。由于两种方法都在事件分派线程中执行,我希望 done()在所有 process()调用完成后执行。换句话说:

期望值:

Writing...
Writing...
Stopped!

结果:

Writing...
Stopped!
Writing...

示例代码

import java.awt.BorderLayout;
import java.awt.Dimension;
import java.awt.Graphics;
import java.awt.event.ActionEvent;
import java.util.List;
import javax.swing.AbstractAction;
import javax.swing.Action;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;

public class Demo {

    private SwingWorker<Void, String> worker;
    private JTextArea textArea;
    private Action startAction, stopAction;

    private void createAndShowGui() {

        startAction = new AbstractAction("Start writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.startWriting();
                this.setEnabled(false);
                stopAction.setEnabled(true);
            }
        };

        stopAction = new AbstractAction("Stop writing") {
            @Override
            public void actionPerformed(ActionEvent e) {
                Demo.this.stopWriting();
                this.setEnabled(false);
                startAction.setEnabled(true);
            }
        };

        JPanel buttonsPanel = new JPanel();
        buttonsPanel.add(new JButton(startAction));
        buttonsPanel.add(new JButton(stopAction));

        textArea = new JTextArea(30, 50);
        JScrollPane scrollPane = new JScrollPane(textArea);

        JFrame frame = new JFrame("Test");
        frame.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        frame.add(scrollPane);
        frame.add(buttonsPanel, BorderLayout.SOUTH);
        frame.pack();
        frame.setLocationRelativeTo(null);
        frame.setVisible(true);
    }

    private void startWriting() {
        stopWriting();
        worker = new SwingWorker<Void, String>() {
            @Override
            protected Void doInBackground() throws Exception {
                while(!isCancelled()) {
                    publish("Writing...\n");
                }
                return null;
            }

            @Override
            protected void process(List<String> chunks) {
                String string = chunks.get(chunks.size() - 1);
                textArea.append(string);
            }

            @Override
            protected void done() {
                textArea.append("Stopped!\n");
            }
        };
        worker.execute();
    }

    private void stopWriting() {
        if(worker != null && !worker.isCancelled()) {
            worker.cancel(true);
        }
    }

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                new Demo().createAndShowGui();
            }
        });
    }
}

2
似乎是一个已知的 bug http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6826514 - gtgaxiola
1
但也请参阅JDK-6826514 - SwingWorker: done()在doInBackground()返回之前被调用,当取消时,已关闭为“不是问题”。 - Jim Garrison
1
经验证,当工作者的背景延迟超过DELAY时,此效果不会出现。 - trashgod
1
相关链接:https://dev59.com/hG025IYBdhLWcg3wHSGn - kmort
谢谢@kmort,当我提出问题时我没有看到那个话题。很有趣 :-) - dic19
显示剩余2条评论
2个回答

18

简短回答:

这是因为publish()不直接调度process(),它设置了一个定时器,在延迟后会在EDT中触发对process()块的调度。因此,当worker被取消时,仍然有一个定时器等待使用最后一次publish的数据来调度process()。使用定时器的原因是为了实现优化,即可以使用多个publish的组合数据执行单个process。

详细回答:

让我们看看publish()和cancel如何相互作用,为此,让我们深入一些源代码。

首先是简单的部分,cancel(true)

public final boolean cancel(boolean mayInterruptIfRunning) {
    return future.cancel(mayInterruptIfRunning);
}

此取消最终调用以下代码:
boolean innerCancel(boolean mayInterruptIfRunning) {
    for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
        if (compareAndSetState(s, CANCELLED)) // <-----
            break;
    }
    if (mayInterruptIfRunning) {
        Thread r = runner;
        if (r != null)
            r.interrupt(); // <-----
    }
    releaseShared(0);
    done(); // <-----
    return true;
}

SwingWorker的状态被设置为"CANCELLED",线程被中断并调用了"done()"方法,但这不是SwingWorker的"done()"方法,而是在SwingWorker构造函数中实例化变量时指定的"future"的"done()"方法。请注意保留HTML标签。
future = new FutureTask<T>(callable) {
    @Override
    protected void done() {
        doneEDT();  // <-----
        setState(StateValue.DONE);
    }
};

doneEDT() 代码是:

private void doneEDT() {
    Runnable doDone =
        new Runnable() {
            public void run() {
                done(); // <-----
            }
        };
    if (SwingUtilities.isEventDispatchThread()) {
        doDone.run(); // <-----
    } else {
        doSubmit.add(doDone);
    }
}

如果我们处于EDT的情况下,会直接调用SwingWorker的done()。此时,SwingWorker应该停止运行,不再调用publish()。以下修改可以很容易地证明这一点:
while(!isCancelled()) {
    textArea.append("Calling publish\n");
    publish("Writing...\n");
}

然而,我们仍然从process()获得“正在写入…”消息。因此,让我们看看process()是如何被调用的。publish(...)的源代码如下:
protected final void publish(V... chunks) {
    synchronized (this) {
        if (doProcess == null) {
            doProcess = new AccumulativeRunnable<V>() {
                @Override
                public void run(List<V> args) {
                    process(args); // <-----
                }
                @Override
                protected void submit() {
                    doSubmit.add(this); // <-----
                }
            };
        }
    }
    doProcess.add(chunks);  // <-----
}

我们可以看到,Runnable的doProcessrun()最终调用了process(args)。但是这段代码只是调用了doProcess.add(chunks)而没有调用doProcess.run(),并且周围还有一个doSubmit。让我们看看doProcess.add(chunks)
public final synchronized void add(T... args) {
    boolean isSubmitted = true;
    if (arguments == null) {
        isSubmitted = false;
        arguments = new ArrayList<T>();
    }
    Collections.addAll(arguments, args); // <-----
    if (!isSubmitted) { //This is what will make that for multiple publishes only one process is executed
        submit(); // <-----
    }
}

publish() 实际上的作用是将块添加到内部的 ArrayList arguments 中,并调用 submit()。我们刚才看到,submit() 只是调用了 doSubmit.add(this),这是同一个 add 方法,因为 doProcessdoSubmit 都扩展了 AccumulativeRunnable<V>,但是这一次的 VRunnable,而不是 doProcess 中的 String。因此,一个块是调用 process(args) 的可运行对象。然而,submit() 调用是在 doSubmit 类中定义的完全不同的方法:

private static class DoSubmitAccumulativeRunnable
     extends AccumulativeRunnable<Runnable> implements ActionListener {
    private final static int DELAY = (int) (1000 / 30);
    @Override
    protected void run(List<Runnable> args) {
        for (Runnable runnable : args) {
            runnable.run();
        }
    }
    @Override
    protected void submit() {
        Timer timer = new Timer(DELAY, this); // <-----
        timer.setRepeats(false);
        timer.start();
    }
    public void actionPerformed(ActionEvent event) {
        run(); // <-----
    }
}

它创建一个计时器,延迟一定时间后触发actionPerformed代码。一旦事件被触发,该代码将被排队到EDT中,EDT将调用内部的run(),最终调用doProcessrun(flush())并执行process(chunk),其中chunk是arguments ArrayList的刷新数据。我省略了一些细节,"run"调用链如下:

  • doSubmit.run()
  • doSubmit.run(flush()) //实际上是runnables的循环,但只有一个(*)
  • doProcess.run()
  • doProcess.run(flush())
  • process(chunk)

(*)布尔值isSubmitedflush()(重置此布尔值)使得对publish的额外调用不会添加doProcess可运行项到doSubmit.run(flush())中,但它们的数据不会被忽略。因此,在计时器生命周期内调用任意数量的发布,仅执行单个处理过程。

总之,publish("Writing...") 的作用是在EDT中安排调用 process(chunk),并且会有一个延迟。这就解释了为什么即使我们取消了线程并且不再发布任何内容,仍然会出现一次处理执行,因为在我们取消工作线程的同时,(很可能)有一个计时器会在已经安排了 done() 后安排一个 process()
为什么要使用这个计时器来代替直接使用 invokeLater(doProcess) 在EDT中安排 process() 呢?这是为了实现 docs 中所解释的性能优化。

Because the process method is invoked asynchronously on the Event Dispatch Thread multiple invocations to the publish method might occur before the process method is executed. For performance purposes all these invocations are coalesced into one invocation with concatenated arguments. For example:

 publish("1");
 publish("2", "3");
 publish("4", "5", "6");

might result in:
 process("1", "2", "3", "4", "5", "6")
我们现在知道这个方法是有效的,因为所有在 DELAY 时间内发生的发布都将它们的 args 添加到了我们看到的那个内部变量 arguments 中,process(chunk) 会一次性执行所有数据。 这是一个 Bug 吗?还是可以解决的问题? 很难说这是否是一个 Bug,也许处理后台线程发布的数据是有意义的,因为工作实际上已经完成,您可能对尽可能多地更新 GUI 感兴趣(例如,如果 process() 正在执行此操作)。然后,如果 done() 需要处理所有数据和/或在 done() 之后调用 process() 会创建数据/GUI 不一致,则可能没有意义。
如果您不希望在 done() 之后执行任何新的 process(),则有一个明显的解决方法,即在 process 方法中也检查 worker 是否已取消!
@Override
protected void process(List<String> chunks) {
    if (isCancelled()) return;
    String string = chunks.get(chunks.size() - 1);
    textArea.append(string);
}

让done()在最后一个process()之后执行会更加棘手,例如,done可以使用定时器,在>DELAY后安排实际的done()工作。虽然我认为这不是一个常见情况,因为如果您取消了它,错过一个process()并不重要,因为我们知道我们实际上正在取消所有未来的执行。


抱歉我说了这么多话 :P - DSquare
5
哇……真是一篇非常好的解释!真的没有想到会这么好!非常感谢您花时间揭示SwingWorker背后的秘密 :) - dic19
3
非常好的解释...感谢你不仅分析了来源,而且花费时间用简单易懂的英语解释。我认为这不是一个bug,只是一种不太令人满意的实现方式。别误解我的意思,我认为SwingWorker很棒,经常使用它,但我认为应该有更好的可见性(一个或两个有用的句柄)来控制这个内部机制,从而使用户能够等待所有发布的块都“完成”。 - mike rodent
我还注意到(这就是我来到这里的原因),似乎java.awt.Robot在其方法waitForIdle()中没有考虑SW.publish/process的内容。我在单元测试中发现了这一点,我经常使用Robot - mike rodent

0

阅读了DSquare的出色答案,并从中得出需要一些子类化的结论后,我为任何需要确保所有已发布块在移动之前都已在EDT中处理的人提出了这个想法。

注意:我尝试用Java而不是Jython(我的首选语言,也是世界上最好的语言)编写它,但它有点复杂,因为例如publishfinal,所以您必须发明另一个方法来调用它,并且还因为您必须在Java中使用泛型(呵欠)对所有内容进行参数化。

这段代码应该可以被任何Java人理解:只是为了帮助,使用self.publication_counter.get(),当结果为0时,它将评估为False

# this is how you say Worker... is a subclass of SwingWorker in Python/Jython
class WorkerAbleToWaitForPublicationToFinish( javax.swing.SwingWorker ):

    # __init__ is the constructor method in Python/Jython
    def __init__( self ):

        # we just add an "attribute" (here, publication_counter) to the object being created (self) to create a field of the new object
        self.publication_counter = java.util.concurrent.atomic.AtomicInteger()

    def await_processing_of_all_chunks( self ):
        while self.publication_counter.get():
            time.sleep( 0.001 )

    # fully functional override of the Java method     
    def process( self, chunks ):
        for chunk in chunks:
            pass
            # DO SOMETHING WITH EACH CHUNK

        # decrement the counter by the number of chunks received
        # NB do this AFTER dealing with the chunks 
        self.publication_counter.addAndGet( - len( chunks ) )

    # fully functional override of the Java method     
    def publish( self, *chunks ):
        # increment the counter by the number of chunks received
        # NB do this BEFORE publishing the chunks
        self.publication_counter.addAndGet( len( chunks ))
        self.super__publish( chunks )

所以在你的调用代码中,你可以放置类似下面的内容:

    engine.update_xliff_task.get()
    engine.update_xliff_task.await_processing_of_all_chunks()

PS:像这样使用while子句(即轮询技术)并不优雅。我查看了可用的java.util.concurrent类,例如CountDownLatchPhaser(都带有线程阻塞方法),但我认为它们都不适合此目的...

稍后

我对一个名为CounterLatch的适当并发类进行了调整(用Java编写,在Apache网站上找到)。他们的版本在达到AtomicLong计数器的值时,在await()处停止线程。如果。我的版本允许您执行以下操作:要么这样做,要么相反:在抬起闩之前说“等待直到计数器达到某个值”:

NB使用AtomicLong作为signalAtomicBoolean作为released:因为在原始Java中它们使用volatile关键字。我认为使用原子类将实现相同的目的。

class CounterLatch():
    def __init__( self, initial = 0, wait_value = 0, lift_on_reached = True ):
        self.count = java.util.concurrent.atomic.AtomicLong( initial )
        self.signal = java.util.concurrent.atomic.AtomicLong( wait_value )

        class Sync( java.util.concurrent.locks.AbstractQueuedSynchronizer ):
            def tryAcquireShared( sync_self, arg ):
                if lift_on_reached:
                    return -1 if (( not self.released.get() ) and self.count.get() != self.signal.get() ) else 1
                else:
                    return -1 if (( not self.released.get() ) and self.count.get() == self.signal.get() ) else 1
            def tryReleaseShared( self, args ):
                return True

        self.sync = Sync()
        self.released = java.util.concurrent.atomic.AtomicBoolean() # initialised at False

    def await( self, *args ):
        if args:
            assert len( args ) == 2
            assert type( args[ 0 ] ) is int
            timeout = args[ 0 ]
            assert type( args[ 1 ] ) is java.util.concurrent.TimeUnit
            unit = args[ 1 ]
            return self.sync.tryAcquireSharedNanos(1, unit.toNanos(timeout))
        else:
            self.sync.acquireSharedInterruptibly( 1 )

    def count_relative( self, n ):
        previous = self.count.addAndGet( n )
        if previous == self.signal.get():
            self.sync.releaseShared( 0 )
        return previous

所以我的代码现在看起来像这样:

在 SwingWorker 构造函数中:

self.publication_counter_latch = CounterLatch() 

在SW.publish中:

self.publication_counter_latch.count_relative( len( chunks ) )
self.super__publish( chunks )

在等待块处理停止的线程中:

worker.publication_counter_latch.await()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接