Java中哪个线程安全队列更高效?

3
有一个简单的任务:多个线程调用 MyClass.add() 函数,而另一个线程试图服务它们。
我的问题是:哪种解决方案更好或更有效?

第一种方法:使用 CopyOnWriteArrayList

@Singleton
public class myClass {

    List<myType> list = new CopyOnWriteArrayList<myType>();
    boolean isRunning = false;

    //this is called from many threads
    public void add(myType x){
        list.add(x);
    }


    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;

        while (!list.isEmpty()) {
            myType curr = list.remove(0);

            //do something with curr...
        }
        isRunning = false;
    }
}



第二种使用简单锁的方法:

@Singleton
public class myClass {

    List<myType> list = new ArrayList<myType>();
    boolean isRunning = false;
    private final Lock _mutex = new ReentrantLock(true);

    //this is called from many threads
    public void add(myType x){
        _mutex.lock();
        list.add(x);
        _mutex.unlock();
    }


    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;

        while (!list.isEmpty()) {
            _mutex.lock();
            myType curr = list.remove(0);
            _mutex.unlock();

            //do something with curr...
        }
        isRunning = false;
    }
}

第三种方法:使用ConcurrentLinkedQueue
@Singleton
public class myClass {

    ConcurrentLinkedQueue<myType> list = new ConcurrentLinkedQueue<myType>();
    boolean isRunning = false;

    //this is called from many threads
    public void add(myType x){
        list.add(x);
    }


    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;

        while (!list.isEmpty()) {
            //list cannot be empty at this point: other threads can't remove any items 
            myType curr = list.poll();

            //do something with curr...
        }
        isRunning = false;
    }
}



这是原来的错误解决方案。我不知道为什么它有时会产生(>100个线程)ConcurrentModificationException(尽管使用了迭代器和“同步”):

@Singleton
public class myClass {

    List<myType> list = Collections.synchronizedList(new ArrayList<myType>());
    boolean isRunning = false;

    //this is called from many threads
    public void add(myType x){
        synchronized(list) {
            list.add(x);
        }
    }


    //this is called from 1 thread
    public void start(){
        if (isRunning) return;
        isRunning = true;

        for (ListIterator<myType> iter = list.listIterator(); iter.hasNext();){
            myType curr = iter.next();

            //do something with curr...

            synchronized(list) {
                iter.remove(); //sometime it gives ConcurrentModificationException!
            }
        }
        isRunning = false;
    }
}

看起来需要一个快速的性能包装器,它分叉X个线程,使Y个队列事务。去做吧! - Gray
1
你原来错误的解决方案没有防止其他线程在synchronized块之前修改循环内的列表。 - SLaks
1
这就是为什么Collections.synchronized*()是无用的众多原因之一。 - SLaks
@Adam Arold:为了保证线程安全、内存友好和快速。 :) - steve
@AlexeiKaigorodov:最坏情况是每秒钟有100个项目到达队列,并且需要4秒钟来服务其中一个。但大多数时候队列中没有活动。谢谢! - steve
显示剩余5条评论
1个回答

2
一般规则是:最适合你问题的那个。
锁变体会大大减慢所有线程的速度,因为如果它们进入锁定部分,即使没有必要(如果有5个元素,则5个线程可以同时轮询它们,只有第6个需要等待),所有线程都会被放置在等待状态。但是,如果您有一个永远不能共享的单一资源,例如网络连接或文件,则此解决方案非常好。
CopyOnWriteArrayList是最佳解决方案,如果您的线程很少写但经常读取。与ConcurrentLinkedQueue相比,写入成本更高,但读取速度更快得多。但是,由于您的代码主要是写入,因此这不是一个好的解决方案。
ConcurrentLinkedQueue是最佳解决方案,如果读取和写入的数量大致相等,因此称为队列。因此,它应该最适合您的情况。
此外,您的代码存在严重错误:
while (!list.isEmpty()) {
  myType curr = list.poll();

这个列表只是保证每个调用都是原子性的,但是你的代码并不会因为你使用它而自动变得线程安全。在这个例子中,列表在isEmpty()poll()之间可能已经被修改过了,所以在isEmpty()调用时它可能有1个元素,但是一旦你poll它就没有任何元素了。ConcurrentLinkedQueue通过返回null来优雅地处理这种情况,但你的代码却没有。因此正确的形式应该是:

myType curr;
while ((curr = list.poll()) != null) {

由于调用 poll 方法是原子性的(也因此是线程安全的),它要么返回一个元素,要么不返回。由于线程问题,在调用前后发生了什么是不确定的,但你可以确信这个单一的调用(它在后台执行了更多操作)将始终完美地运行。

对于你的 remove(0) 调用也是如此,如果另一个线程已经删除了最后一个元素,则该调用可能会抛出 IndexOutOfBoundsException 异常。


感谢您查看我的代码。 我选择了ConcurrentLinkedQueue. :)关于错误:在这种情况下,只有一个线程可以进入函数start()(布尔值isRunning保护它),因此在IsEmpty()之后和poll()之前列表的大小变小是不可能的(正如我在代码中所注释的)。 但总的来说,您是正确的。 - steve
你的 isRunning 没有任何保护作用,原因也是一样的。;) 如果你不将其定义为 volatile,每个线程仍然会基于自己的本地副本工作。在这里可以使用 AtomicBoolean。 - TwoThe
哦,我忘了说这个类是一个单例bean。所以isRunning是受保护的。 - steve

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接