我不完全确定这是否正确,但我想让大家评论一下。首先,让我介绍一种无锁不交集算法,它将成为我提出的算法的重要组成部分。
无锁不交集算法
我假设您选择的CPU架构上存在两个指针大小的比较和交换操作。至少在x86和x64架构上可用。
该算法基本上与单线程情况下维基百科页面上描述的算法相同,但对于安全的无锁操作进行了一些修改。首先,我们需要将rank和parent元素都设置为指针大小,并在内存中对齐为2*sizeof(pointer),以便进行原子CAS操作。
Find()不需要更改;最坏的情况是在同时写入的情况下,路径压缩优化未能充分发挥作用。
Union()必须更改:
function Union(x, y)
redo:
x = Find(x)
y = Find(y)
if x == y
return
xSnap = AtomicRead(x) -- read both rank and pointer atomically
ySnap = AtomicRead(y) -- this operation may be done using a CAS
if (xSnap.parent != x || ySnap.parent != y)
goto redo
-- Ensure x has lower rank (meaning y will be the new root)
if (xSnap.rank > ySnap.rank)
swap(xSnap, ySnap)
swap(x, y)
-- if same rank, use pointer value as a fallback sort
else if (xSnap.rank == ySnap.rank && x > y)
swap(xSnap, ySnap)
swap(x, y)
yNew = ySnap
yNew.rank = max(yNew.rank, xSnap.rank + 1)
xNew = xSnap
xNew.parent = y
if (!CAS(y, ySnap, yNew))
goto redo
if (!CAS(x, xSnap, xNew))
goto redo
return
这应该是安全的,因为它永远不会形成循环,并且始终会得到正确的并集。我们可以通过观察以下事实来确认:
- 首先,在终止之前,两个根之一将始终以指向另一个的父节点结束。因此,只要没有循环,合并就成功了。
- 其次,排名始终增加。比较x和y的顺序后,我们知道在快照时x的排名低于y。为了形成循环,另一个线程需要先增加x的排名,然后合并x和y。但是,在写入x的父指针的CAS中,我们检查了排名是否发生了变化;因此,y的排名必须保持大于x。
在同时突变的情况下,y的排名可能会增加,然后由于冲突而返回重做。但是,这意味着y要么不再是根(在这种情况下,排名无关紧要),要么y的排名已经被另一个进程增加(在这种情况下,第二次尝试将不起作用,y将具有正确的排名)。
因此,不应该有形成循环的机会,这种无锁不相交集算法应该是安全的。
现在进入应用程序问题...
假设
我假设脊线段只能在其端点相交。如果不是这种情况,您将需要以某种方式更改第1阶段。
我还假设单个整数像素位置的共存足以连接脊线段。如果不是这样,您将需要更改第1阶段中的数组,以保存多个候选脊线段+不相交集对,并过滤出实际连接的那些。
此算法中使用的不相交集结构将在其结构中携带对线段的引用。在合并的情况下,我们任意选择两个记录的线段之一来表示集合。
第1阶段:本地线路识别
我们首先将地图分成扇区,每个扇区将作为单独的作业进行处理。可以在不同的线程中处理多个作业,但每个作业将仅由一个线程处理。如果脊线段跨越扇区,则将其分成两个部分,每个部分都属于一个扇区。
对于每个扇区,建立将像素位置映射到不相交集结构的数组。大部分此数组稍后将被丢弃,因此其内存要求不应太重。
我们首先处理扇区中的每个线段。我们首先选择一个不相交集合,表示线段所在的整条直线。我们首先在像素位置数组中查找每个端点,以查看是否已分配了不相交集合结构。如果其中一个端点已经在此数组中,则使用分配的不相交集合。如果两个端点都在数组中,则对不相交集合执行合并,并使用新的根作为我们的集合。否则,我们创建一个新的不相交集合,并将当前线段的引用与不相交集合结构相关联。然后,我们将我们的每个端点的新不相交集合的根写回到像素位置数组中。
这个过程对于扇区中的每个线段都要重复进行;最终,我们将通过不相交集合完全识别扇区内的所有线段。
请注意,由于不相交集合尚未在线程之间共享,因此还没有必要使用compare-and-swap操作;只需使用正常的单线程union-merge算法即可。由于在算法完成之前不释放任何不相交集合结构,因此可以从每个线程的bump分配器中进行分配,使内存分配(几乎)无锁且O(1)。
一旦一个扇区被完全处理,像素位置数组中的所有数据都将被丢弃;但是,与扇区边缘像素对应的数据将被复制到一个新数组中,并保留到下一阶段。
由于遍历整个图像是O(x*y),而不相交合并实际上是O(1),因此此操作为O(x*y),需要工作内存O(m+2*x*y/k+k^2)=O(x*y/k+k^2),其中t是扇区数,k是扇区的宽度,m是扇区中部分线段的数量(取决于线路跨越边界的频率,m可能会有很大变化,但永远不会超过线段数)。传递到下一个操作的内存为O(m+2*x*y/k)=O(x*y/k)
第2阶段:跨扇区合并
一旦所有扇区都已处理完毕,我们就转向合并跨越扇区的线路。对于每个扇区之间的边界,我们对跨越边界的线路执行无锁合并操作(即,在边界的每侧相邻像素已分配给线路集的地方)。
此操作的运行时间为O(x+y),消耗O(1)内存(但必须保留第1阶段的内存)。完成后,可以丢弃边缘数组。
第3阶段:收集线路
现在,我们在所有分配的不相交集合结构对象上执行多线程映射操作。我们首先跳过任何不是根的对象(即,当obj.parent != obj时)。然后,从代表线段开始,我们从那里向外移动,并收集和记录关于该线路的任何所需信息。我们可以确保每次只有一个线程查看任何给定的线路,因为相交的线路将最终进入同一不相交集合结构中。
这需要O(m)运行时间,并且内存使用量取决于您需要收集关于这些线段的哪些信息。
概要
总的来说,该算法的运行时间应为O(x*y),内存使用量为O(x*y/k + k^2)。调整k可以在第一阶段过程中在瞬时内存使用和传递到第二阶段的邻接数组和不相交集合结构之间进行权衡。
请注意,我实际上没有在真实世界中测试过这个算法的性能;也有可能我忽视了无锁不相交集合并算法中的并发问题。欢迎评论 :)