简单无锁栈

3

最近我发现了这样一个与Java并发相关的面试任务:

编写一个简单的无锁(lock-free)栈,包含两个方法:push和pop。

我对此进行了思考:

import java.util.concurrent.atomic.AtomicInteger;


public class Stack {
    private AtomicInteger count = new AtomicInteger(-1);
    private Object[] data = new Object[1000];

    public void push(Object o) {
        int c = count.incrementAndGet();
        data[c] = o;
    }

    public Object pop() {
        Object top;
        int c;
        while (true) {
            c = count.get();
            if (c == -1) return null;
            top = data[c];
            if (count.compareAndSet(c, c-1))
                return top;
        }
    }
}

这个问题的意思是:这种方法是否与预期相似?或者“无锁栈”指的是不同的东西?请帮助一个Java面试新手。

1
这是一个错误的样例,可能会按照以下顺序处理序列: 1)push:c = count.incrementAndGet() 2)pop:c = count.get(); 3)pop:top = data[c] 4)data[c] = o 因此你会得到一个旧的或未初始化的值。 尽管这是无锁的,但是不正确/鲁棒。 - SkateScout
4个回答

10

你肯定已经朝着正确的方向迈进,考虑使用Java的原子整数和原子函数。这将是一个无锁栈,也就是说,没有显式的锁。

然而,当同时访问时,它仍然不正确,而且很容易证明:想象一下你的push()线程在获取计数并将新元素添加到栈(data[c]=o)之间被阻塞,与此同时,一个 pop()线程出现,获取更高的计数并弹出...什么?无论该位置在堆栈中存储的内容是什么,但不是对象o(因为它尚未插入)。

这就是无锁、基于数组的栈的问题所在,你理论上需要调整两个东西,即计数和特定单元格的内容,但你不能同时原子地执行这两个操作。我不知道有没有任何无锁、基于数组的栈算法。

不过,有链表支持的无锁栈算法,因为在这种情况下,你可以创建一个新节点,分配它的内容,并且只需要执行一个操作以原子方式更改顶部指针。

如果你对这个论点感兴趣,最好的文学作品是Shavit和Herlihy的《多处理器编程艺术》,其中描述了许多不同的数据结构,无论是无锁还是基于锁的。我现在找不到任何详细描述“通常”无锁栈算法的论文,虽然Maged Michael在他的SMR论文第8页4.2点提到它,而我自己则实现了一个C99实现。


1
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;

public class LockFreeStack {

public static void main(String... args) {
    LFStack<String> stack = new LFStack<String>();
    for (int i = 0; i < 10; i++) {
        Thread t = new Thread(new RandomStackUse(stack));
        t.setName("My stack thread " + i);
        t.start();
    }
}

private static class LFStack<E> {
    private volatile AtomicReference<Node<E>> head = new AtomicReference<Node<E>>();

    public E peek() {
        E payload = null;
        Node<E> node = head.get();
        if (node != null) { payload = node.payload; }
        return payload;
    }

    public E pop() {
        E payload;
        while (true) {
            Node<E> oldHeadNode = head.get();
            if (oldHeadNode == null) { return null; }
            payload = head.get().payload;
            if (head.compareAndSet(oldHeadNode, oldHeadNode.next.get())) { break; }
            //System.out.println("Retry");
        }
        return payload;
    }

    public void push(E e) {
        Node<E> oldHeadNode = new Node<E>(e);

        while (true) {
            Node<E> oldRootNode = head.get();
            if (oldRootNode != null) { oldHeadNode.next.set(oldRootNode); }
            if (head.compareAndSet(oldRootNode, oldHeadNode)) { break; }
            //System.out.println("Retry");
        }
    }
}


//to be used as LinkedList chain <Node> => <Node> => <Node> => null
private static class Node<E> {
    private E payload;
    private AtomicReference<Node<E>> next;

    public Node(E e) {
        payload = e;
        next = new AtomicReference<Node<E>>();
    }
}

public static class RandomStackUse implements Runnable {
    private LFStack<String> stack;
    private Random rand = new Random();

    public RandomStackUse(LFStack<String> stack) {this.stack = stack;}

    @Override
    public void run() {
        long counter = 0;
        while (true) {
            if (rand.nextInt() % 3 == 0) {
                stack.push(String.valueOf(counter++));
                //System.out.println(String.format("%s pushed %d", Thread.currentThread().getName(), counter));
            }
            if (rand.nextInt() % 3 == 1) {
                String value = stack.pop();
                //System.out.println(String.format("%s pop %s", Thread.currentThread().getName(), value));
            }
            if (rand.nextInt() % 3 == 2) {
                String value = stack.peek();
                //System.out.println(String.format("%s peek %s", Thread.currentThread().getName(), value));
            }
        }
    }
}
}

1
我认为这里有一个错误:Node oldHeadNode = head.get(); if (oldHeadNode == null) { return null; } payload = head.get().payload;最好只调用一次 head.get(),否则如果另一个线程在两次调用 head.get() 之间调用了 pop,则可能会得到 null。 - paulkernfeld

-1
public class MyConcurrentStack<T>
{
private AtomicReference<Node> head = new AtomicReference<Node>();
public MyConcurrentStack()
{

}

public void push(T t)
{
    Node<T> n = new Node<T>(t);
    Node<T> current;

    do
    {
        current = head.get();
        n.setNext(current);
    }while(!head.compareAndSet(current, n));
}

public T pop()
{
    Node<T> currentHead = null;
    Node<T> futureHead = null;
    do
    {
        currentHead = head.get();
        if(currentHead == null)
        {
            return null;
        }
        futureHead = currentHead.next;
    }while(!head.compareAndSet(currentHead, futureHead)); 

    return currentHead.data;
}

public T peek()
{
    Node<T> n = head.get();
    if(n==null)
    {
        return null;
    }
    else
    {
        return n.data;
    }
}

private static class Node<T>
{
       private final T data;
       private Node<T> next;

       private Node(T data)
       {
           this.data = data;
       }

       private void setNext(Node next)
       {
           this.next = next;
       }
}

public static void main(String[] args)
{
    MyConcurrentStack m = new MyConcurrentStack();
    m.push(12);
    m.push(13);
    m.push(15);

    System.out.println(m.pop());
    System.out.println(m.pop());
    System.out.println(m.pop());
    System.out.println(m.pop());
}
}

代码本身就很清晰易懂。如果有人需要解释,请告诉我。 堆栈按照以下图表形成:

  ...     ...      ...
 |   |-->|   | -->|   |
  ...     ...      ...

   ^
   |
current head

-2

您可以使用BlockingQueue,使用put()方法插入元素,使用drainTo(Collection c)方法获取元素。然后从c的末尾读取元素。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接