为索引式并发跳表实现交换方法

4
我正在基于Java的ConcurrentSkipListMap实现一个并发跳表映射,与其不同之处在于我希望该列表允许重复,并且我还希望该列表是可索引的(这样查找列表的第N个元素只需O(lg(n))时间,而不是像标准跳表一样需要O(n)时间)。这些修改没有出现问题。
此外,跳表的键是可变的。例如,如果列表元素是整数{0,4,7},则可以将中间元素的键更改为[0,7]中的任何值,而无需提示更改列表结构; 如果键更改为(-inf,-1]或[8,+inf),则删除该元素并重新添加以维护列表顺序。我不是将其实现为O(lg(n))插入的删除,而是将其实现为删除,然后进行线性遍历,然后进行O(1)插入(预期运行时间为O(1) - 99%的时间节点将与相邻节点交换)。
在启动后插入全新节点很少发生,并且从不删除节点(而不立即重新添加它); 几乎所有操作都是elementAt(i),用于检索第i个索引处的元素,或在修改键后交换节点的操作。
我遇到的问题是如何实现键修改类。从概念上讲,我想做一些像这样的事情:
public class Node implements Runnable {
    private int key;
    private Node prev, next;
    private BlockingQueue<Integer> queue;

    public void update(int i) {
        queue.offer(i);
    }

    public void run() {
        while(true) {
            int temp = queue.take();
            temp += key;
            if(prev.getKey() > temp) {
                // remove node, update key to temp, perform backward linear traversal, and insert
            } else if(next.getKey() < temp) {
                // remove node, update key to temp, perform forward linear traveral, and insert
            } else {
                key = temp; // node doesn't change position
            }
        }
    }
}

调用run中的insert子方法使用CAS来处理两个节点尝试在同一位置同时插入的问题(类似于ConcurrentSkipListMap处理冲突插入的方式)- 从概念上讲,这与第一个节点锁定插入点相邻的节点是相同的,只是当没有冲突时,开销会减少。

通过这种方式,我可以确保列表始终按顺序排列(如果键更新有点延迟也没关系,因为我可以确定更新最终会发生;但是,如果列表变得无序,情况可能会变得混乱)。问题在于,以这种方式实现列表将生成大量线程,每个Node一个线程(列表中有数千个节点)- 在任何给定时间,它们中的大多数将被阻塞,但我担心数千个阻塞线程仍会导致开销过高。

另一种选择是将update方法同步化,并从Node中删除Runnable接口,这样,两个线程不再将更新排队到Node中,而是轮流执行Node#update方法。问题在于,这可能会创建瓶颈;如果八个不同的线程都决定同时更新同一个节点,则队列实现会很好地扩展,但同步实现将阻塞八个线程之中的七个(然后阻塞六个线程,然后五个等等)。
所以我的问题是,我应该如何实现类似于队列实现但使用较少数量的线程,或者如何实现类似于同步实现但没有潜在的瓶颈问题。
1个回答

0

我认为我可以通过使用ThreadPoolExecutor来解决这个问题,类似于:

public class Node {
    private int key;
    private Node prev, next;
    private ConcurrentLinkedQueue<Integer> queue;
    private AtomicBoolean lock = new AtomicBoolean(false);
    private ThreadPoolExecutor executor;
    private UpdateNode updater = new UpdateNode();

    public void update(int i) {
        queue.offer(i);
        if(lock.compareAndSet(false, true)) {
            executor.execute(updater);
        }
    }

    private class UpdateNode implements Runnable {
        public void run() {
            do {
                try {
                    int temp = key;
                    while(!queue.isEmpty()) {
                        temp += queue.poll();
                    }
                    if(prev.getKey() > temp) {
                        // remove node, update key to temp, perform backward linear traversal, and insert
                    } else if(next.getKey() < temp) {
                        // remove node, update key to temp, perform forward linear traveral, and insert
                    } else {
                        key = temp; // node doesn't change position
                    }
                } finally {
                    lock.set(false);
                }
            } while (!queue.isEmpty() && lock.compareAndSet(false, true));
        }
    }
}

这样,我就可以享受队列方法的优势,而不必让一千个线程被阻塞 - 每次需要更新节点时,我执行一个UpdateNode(除非已经在该Node上执行了UpdateNode,因此使用AtomicBoolean作为锁),并依靠ThreadPoolExecutor使创建数千个可运行对象变得廉价。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接