交替运行两个线程的最佳方式是什么?

8

更新:请查看问题底部,以获得完整的答案。

我想运行一个辅助线程,使我的主线程和辅助线程交替执行操作(不,我不想在主线程中执行所有操作,这是为了一个单元测试)。

我想出了两种不同的解决方案,但我不知道哪个是最好的,并且对第一个解决方案有一些疑问:

使用 Exchanger

我想到了使用Exchanger,虽然我不只想交换一个对象。

@Test
public void launchMyTest() {
    /**
     * An anonymous class to set some variables from a different thread
     */
    class ThreadTest extends Thread {
        //declare some various attributes that will be set
        //NOT DECLARED VOLATILE
        ...

        public final Exchanger<Integer> exchanger = new Exchanger<Integer>();

        @Override
        public void run() {
            try {
                //start of the synchronization 
                int turn = 1;
                while (turn != 2) {
                    turn = this.exchanger.exchange(turn);
                }

                //do some work and set my various variables
                ...

                //main thread's turn
                turn = 1;
                this.exchanger.exchange(turn);
                //wait for this thread's turn
                while (turn != 2) {
                    turn = this.exchanger.exchange(turn);
                }

                //redo some other work and reset the various variables
                ...

                //main thread's turn
                turn = 1;
                this.exchanger.exchange(turn);

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } 
        }
    }


    try {
        //some work in the main thread
        ....

        //launch the job in the second thread
        ThreadTest test = new ThreadTest();
        test.start();
        //start of the synchronization
        int turn = 2;
        test.exchanger.exchange(turn);
        //wait for this thread's turn
        while (turn != 1) {
            turn = test.exchanger.exchange(turn);
        }

        //run some tests using the various variables of the anonymous class
        ....

        //now, relaunch following operations in the second thread
        turn = 2;
        test.exchanger.exchange(turn);
        //wait for this thread's turn
        while (turn != 1) {
            turn = test.exchanger.exchange(turn);
        }

        //do some other tests using the various variables of the anonymous class
        //...

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

问题:

  • 我是否正确理解exchange方法与使用Lock一样具有内存同步的功能?

使用条件

另一种使用条件的解决方案:

@Test
public void launchMyTest() {
    /**
     * An anonymous class to set some variables from a different thread
     */
    class ThreadTest extends Thread {
        //declare some various attributes that will be set
        //NOT DECLARED VOLATILE
        ...

        public final Lock lock = new ReentrantLock();
        public final Condition oneAtATime = lock.newCondition();
        public int turn = 1;

        @Override
        public void run() {
            this.lock.lock();
            try {
                //do some work and set my various variables
                ...

                //main thread's turn
                this.turn = 1;
                this.oneAtATime.signal();

                //wait for this thread's turn
                while (this.turn != 2) {
                    this.oneAtATime.await();
                }

                //redo some other work and reset the various variables
                ...

                //main thread's turn
                this.turn = 1;
                this.oneAtATime.signal();

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                this.lock.unlock();
            }
        }
    }


    ThreadTest test = new ThreadTest();
    test.lock.lock();
    try {
        //some work in the main thread
        ....

        //launch the job in the second thread
        test.turn = 2;
        test.start();
        //wait for this thread's turn
        while (test.turn != 1) {
            test.oneAtATime.await();
        }

        //run some tests using the various variables of the anonymous class
        ....

        //now, relaunch following operations in the second thread
        test.turn = 2;
        test.oneAtATime.signal();
        //wait for this thread's turn
        while (test.turn != 1) {
            test.oneAtATime.await();
        }

        //do some other tests using the various variables of the anonymous class
        //...

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } finally {
        test.lock.unlock();
    }
}

对我而言,这似乎有点复杂。

结论

你认为哪种解决方案最好?我是做对了,还是错过了另一个明显的解决方案?

我没有使用 CountDownLatch,因为我想交替运行多个操作,而 CountDownLatch 不能被重置。而且我并没有发现使用 CyclicBarrier 会使代码更简单...(实际上,我并没有完全理解如何使用它,但它看起来不比使用 ExchangerCondition 更简单)

谢谢。

更新

@Clément MATHIEU 在其已接受的答案的评论中提供了不同的实现示例,请参见:https://gist.github.com/cykl/5131021

这里有三个示例,一个使用 CyclicBarrier,另一个使用 Exchanger,最后一个使用 2 个 Semaphore。虽然他说“最表达清晰的是基于信号量的”,但我选择使用 Exchanger 来简化操作。我的单元测试变成了:

@Test
public void launchMyTest() {
    /**
     * An anonymous class to set some variables from a different thread
     */
    class ThreadTest extends Thread {
        //declare some various attributes that will be set
        //NOT DECLARED VOLATILE
        ...
        public final Exchanger<Integer> exchanger = new Exchanger<Integer>();

        @Override
        public void run() {
            try {
                //do some work and set my various variables
                ...

                //main thread's turn
                this.exchanger.exchange(null);
                //wait for this thread's turn
                this.exchanger.exchange(null);

                //redo some other work and reset the various variables
                ...

                //main thread's turn
                this.exchanger.exchange(null);

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } 
        }
    }


    try {
        //some work in the main thread
        ....

        //launch the job in the second thread
        ThreadTest test = new ThreadTest();
        test.start();
        //wait for this thread's turn
        test.exchanger.exchange(null);

        //run some tests using the various variables of the anonymous class
        ....

        //now, relaunch following operations in the second thread
        test.exchanger.exchange(null);
        //wait for this thread's turn
        test.exchanger.exchange(null);

        //do some other tests using the various variables of the anonymous class
        //...

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

2
看一下 Executors。 - Thorbjørn Ravn Andersen
不要看 Executors,或者不要执行任何建议的解决方案? :p - FBB
如果我正确理解Exchanger,它的作用是阻止两个线程之一访问它,直到另一个尝试交换,然后它们交换项目,然后再次运行。所以它绝对不会做你想做的事情。我猜你实际上想要一对普通的信号量。也就是说,当线程T1完成操作时,它会发出S2信号,在S1上等待,反之亦然。 - millimoose
这似乎是大学级别的教材可能会涉及到的内容。可以在操作系统教科书中查找一下? - millimoose
可以这么做。但我曾经读过一次,它就像是在汇编语言中编程一样。 - FBB
显示剩余2条评论
4个回答

2

1

我是否正确,交换方法执行内存同步,就像使用锁一样?

你是对的。Javadoc指定了存在 happen-before 关系:

"内存一致性效果:对于每一对成功通过 Exchanger 交换对象的线程,在一个线程中 exchange() 之前的操作 happen-before 于另一个线程中从相应的 exchange() 返回后的操作。"

你认为什么是最好的解决方案?

两种方法都是等价的。你应该以表达能力为目标。我发现基于同步/锁/监视器的解决方案比基于交换的解决方案更具表现力。但是,如果你将此代码抽象成一个专用类,那么这并不重要。

我做得对吗,还是我错过了另一个明显的解决方案?

据我所知没有。如果你不想重新发明轮子。

请注意,基于 ReentrantLock 的解决方案也可以使用普通的同步或 Guava 中的 Monitor 编写。

请参考:http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/Monitor.html 进行比较。

我并没有发现CyclicBarrier能使代码更简单...(实际上我不完全明白如何使用它,但是它似乎并不比使用

CyclicBarrier不适合您的需求。它不是为互斥设计的;它允许一组线程定义一个共同的屏障。线程将并发执行,并在某个点等待彼此,然后才移动到下一步。


1
感谢您提供完整的答案。“每个线程中exchange()之前的操作都发生在之前”=>这是否意味着我不需要//同步开始下面的两个代码块?我认为我需要定义一种“同步起点”,就像我们在Condition上等待之前获取锁定一样。如果没有定义一个,这对我来说似乎很奇怪。 - FBB
1
我快速编写了三个示例,使用Exchanger、CyclicBarrier和两个Semaphores,以确保您掌握了概念。我没有编写使用ReentrantLock或Monitor的示例,但它应该非常相似。所有这些实现都应该可以工作,并且非常相似。在我看来,最干净和最具表现力的是基于信号量的实现。请参见:https://gist.github.com/cykl/5131021 - Clément MATHIEU
1
您不必调用reset(也不能)。当当前代满时,屏障会自动重置。这就是为什么屏障是循环的原因。请参见OpenJDK实现中的dowaitnextGeneration。由于您想要严格的轮询行为,即使启用了公平行为标志,我也没有看到只使用一个信号量的任何强大解决方案。我更新了gist以添加SingleSemaphoreHack,它是一个丑陋的hack,一旦达到Integer.MAX_VALUE,它就会崩溃。它仅用于教育目的,请尝试理解为什么您必须玩弄许可数量。 - Clément MATHIEU
1
仅仅因为 happens before 发生在 happens 之前,并不意味着所有变量都已更新到主内存。JVM 可能会决定它们不共享,因此共享变量应该是 volatile 的。 - tgkprog
2
@tgkprog:您错了。JMM和Exchanger文档明确指出,在交换之前可见的任何内存操作在交换后对其他线程可见。 - Clément MATHIEU
显示剩余8条评论

0

锁定机制直接解决了您在此执行的任务,通过关注锁的互斥性,因此我建议采用这种方法。


第一种解决方案相对于第二种有任何好处吗?使用一个Exchanger而不进行任何交换看起来很奇怪,所以也许第二种更加明确? - FBB

0

虽然我还没有使用过Exchanger,但它看起来是你想要实现的最简单的解决方案。比起更通用的Lock/Condition版本,代码更少。至于内存一致性:他们在这里承诺here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接