确定跨步数组是否重叠的算法?

8
在我所工作的库中,我们有数据集(可能是其他数据集的子集),这些数据集以三维矩形分步数组的形式分布在内存中。也就是说,一个数组A可以被表示为A(i,j,k),其中每个索引的范围从零到某个上限,并且内存中每个元素的位置如下所示:
A(i,j,k) = A0 + i * A_stride_i + j * A_stride_j + k * A_stride_k

其中A0是基本指针,A_stride_i等是维度步长。

由于这些数据集可能是其他数据集的子集,而不是各自占用独立的malloc'ed内存块,因此它们可能重叠(其中重叠意味着A(i,j,k) < B(m,n,p)既不总是真也不总是假),如果它们重叠,则它们可能相互交错或相互碰撞(其中碰撞意味着某些六元组索引的A(i,j,k) == B(m,n,p))。

问题就在这里。对于两个数据集的一些操作(例如复制),仅当数组彼此不发生碰撞时才有效,但如果它们以交替的非碰撞方式重叠,则有效。我想为两个数据集添加一个函数,判断它们是否发生碰撞。

是否存在一种现有算法可以以合理高效且简单的方式执行此操作?

检查数据集是否重叠相当容易,因此关键问题是:给定这种形式的两个重叠数据集,有什么有效的算法可以确定它们是否交错或碰撞?

示例:

作为一个简单的例子,假设我们有从0到F(十六进制)的内存位置:

0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F

为了简单起见,我在这里只考虑二维数组。假设我们有一个大小为2,3的数组(即,0 <= i < 20 <= j < 3),其中stride_i = 1stride_j = 4,基地址为2。它将占用以下位置(其i,j对表示已占用位置):

0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F
      *  *        *  *        *  *

同样地,如果我们有另一个大小和步幅相同的数组,从基地址为4开始,它将如下所示:
0  1  2  3  4  5  6  7  8  9  A  B  C  D  E  F
            o  o        o  o        o  o

在我描述问题时使用的术语中,这些数组“重叠”,但它们不会发生冲突。
限制和假设:
我们可以假设步幅为正数,并且如果需要,它们按递增顺序排列。虽然实际库中没有这两个条件,但重新排列数组定义以达到这一点相当简单。
我们可以假设数组不自我交错。这也没有被库强制执行,但会是一个病态情况,并且可以单独进行警告。即(假设步幅按递增顺序排列,i从零到max_i等):
stride_j >= max_i * stride_i stride_k >= max_j * stride_j
当然,对于不需要这些假设的方法,我们会给予加分,因为将数组定义重新排列为规范顺序是一项有点费力的工作,最好避免。
这两个数组不能假定具有相等的大小或步幅。
我认为在构建过程中跟踪事物没有价值——在测试时不存在构建时不存在的信息。此外,“构建”可能仅仅是“考虑具有此基指针、这些步幅和这些大小的较大数组的子集”。
最坏情况:
svick的答案提醒我应该添加一些关于我预计出现的一些典型“更糟”的情况的内容。其中最糟糕的情况之一是当我们有一个数组,表示一些非常大的复数值,存储在连续的(实数,虚数)对中,然后我们有两个子数组分别包含实部和虚部——因此,您在数组中有几百万个元素,在这些数组之间交替出现。由于这不是一个不太可能的情况,所以应该能够用除了极差性能之外的其他东西进行测试。

步长(strides)能够有任何值吗?还是像下面这样的规则适用呢?A_stride_j >= max(k) * A_stride_k 这样可以确保数组没有交错。 - svick
这个问题很难理解,需要澄清。一个数据集是否可以具有任意的(i,j,k)值?还是它们是“连续的”?重叠难道不已经意味着碰撞了吗?请给一些例子... - Aryabhatta
在构建过程中,您是否被允许跟踪重叠部分? - The Alchemist
@svick,@Moron,@the-alchemist:感谢您们的评论;我认为我已经在编辑问题时解决了所有问题。 - Brooks Moses
2个回答

2
我认为下面的C#程序应该可以工作。它使用分支定界法,并适用于任意维数的数组。
    using System;
    using System.Collections.Generic;

    namespace SO_strides
    {
        sealed class Dimension
        {
            public int Min { get; private set; }
            public int Max { get; private set; }
            public int Stride { get; private set; }

            private Dimension() { }

            public Dimension(int max, int stride)
            {
                Min = 0;
                Max = max;
                Stride = stride;
            }

            public Dimension[] Halve()
            {
                if (Max == Min)
                    throw new InvalidOperationException();

                int split = Min + (Max - Min) / 2;

                return new Dimension[]
                {
                    new Dimension { Min = Min, Max = split, Stride = Stride },
                    new Dimension { Min = split + 1, Max = Max, Stride = Stride }
                };
            }
        }

        sealed class ArrayPart
        {
            public int BaseAddr { get; private set; }
            public Dimension[] Dimensions { get; private set; }
            public int FirstNonconstantIndex { get; private set; }

            int? min;
            public int Min
            {
                get
                {
                    if (min == null)
                    {
                        int result = BaseAddr;
                        foreach (Dimension dimension in Dimensions)
                            result += dimension.Min * dimension.Stride;
                        min = result;
                    }
                    return min.Value;
                }
            }

            int? max;
            public int Max
            {
                get
                {
                    if (max == null)
                    {
                        int result = BaseAddr;
                        foreach (Dimension dimension in Dimensions)
                            result += dimension.Max * dimension.Stride;
                        max = result;
                    }
                    return max.Value;
                }
            }

            public int Size
            {
                get
                {
                    return Max - Min + 1;
                }
            }

            public ArrayPart(int baseAddr, Dimension[] dimensions)
                : this(baseAddr, dimensions, 0)
            {
                Array.Sort(dimensions, (d1, d2) => d2.Stride - d1.Stride);
            }

            private ArrayPart(int baseAddr, Dimension[] dimensions, int fni)
            {
                BaseAddr = baseAddr;
                Dimensions = dimensions;
                FirstNonconstantIndex = fni;
            }

            public bool CanHalve()
            {
                while (FirstNonconstantIndex < Dimensions.Length
                    && Dimensions[FirstNonconstantIndex].Min == Dimensions[FirstNonconstantIndex].Max)
                    FirstNonconstantIndex++;

                return FirstNonconstantIndex < Dimensions.Length;
            }

            public ArrayPart[] Halve()
            {
                Dimension[][] result = new Dimension[2][];

                Dimension[] halves = Dimensions[FirstNonconstantIndex].Halve();

                for (int i = 0; i < 2; i++)
                {
                    result[i] = (Dimension[])Dimensions.Clone();
                    result[i][FirstNonconstantIndex] = halves[i];
                }

                return new ArrayPart[]
                {
                    new ArrayPart(BaseAddr, result[0], FirstNonconstantIndex),
                    new ArrayPart(BaseAddr, result[1], FirstNonconstantIndex)
                };
            }
        }

        sealed class CandidateSet
        {
            public ArrayPart First { get; private set; }
            public ArrayPart Second { get; private set; }

            public CandidateSet(ArrayPart first, ArrayPart second)
            {
                First = first;
                Second = second;
            }

            public bool Empty
            {
                get
                {
                    return First.Min > Second.Max || Second.Min > First.Max;
                }
            }

            public CandidateSet[] Halve()
            {
                int firstSize = First.Size;
                int secondSize = Second.Size;

                CandidateSet[] result;

                if (firstSize > secondSize && First.CanHalve())
                {
                    ArrayPart[] halves = First.Halve();
                    result = new CandidateSet[]
                    {
                        new CandidateSet(halves[0], Second),
                        new CandidateSet(halves[1], Second)
                    };
                }
                else if (Second.CanHalve())
                {
                    ArrayPart[] halves = Second.Halve();
                    result = new CandidateSet[]
                    {
                        new CandidateSet(First, halves[0]),
                        new CandidateSet(First, halves[1])
                    };
                }
                else
                    throw new InvalidOperationException();

                return result;
            }

            public static bool HasSolution(ArrayPart first, ArrayPart second)
            {
                Stack<CandidateSet> stack = new Stack<CandidateSet>();
                stack.Push(new CandidateSet(first, second));

                bool found = false;

                while (!found && stack.Count > 0)
                {
                    CandidateSet candidate = stack.Pop();
                    if (candidate.First.Size == 1 && candidate.Second.Size == 1)
                        found = true;
                    else
                    {
                        foreach (CandidateSet half in candidate.Halve())
                            if (!half.Empty)
                                stack.Push(half);
                    }
                }

                return found;
            }
        }

        static class Program
        {
            static void Main()
            {
                Console.WriteLine(
                    CandidateSet.HasSolution(
                        new ArrayPart(2, new Dimension[] { new Dimension(1, 1), new Dimension(2, 4) }),
                        new ArrayPart(4, new Dimension[] { new Dimension(1, 1), new Dimension(2, 4) })
                        )
                    );
            }
        }
    }

1
谢谢!看起来这是一个相当聪明的一般解决方案。它在我们关心的某些情况下性能不佳(请参见我添加到问题中的注释),但我认为我们可以通过一些特殊用例测试来处理这些情况——例如,检查基础值是否模所有步幅的最小公共倍数不同;检查较大的步幅是否相同,如果是,则比较“行”。 - Brooks Moses

1
numpy的实现可能是这个问题上最好的参考之一。

https://github.com/numpy/numpy/blob/60c8b6a9f467f7809d4078aec6e4fe7cbc8191b8/numpy/core/src/common/mem_overlap.c

Solving memory overlap integer programs and bounded Diophantine equations with positive coefficients.

Asking whether two strided arrays a and b overlap is equivalent to asking whether there is a solution to the following problem::

  sum(stride_a[i] * x_a[i] for i in range(ndim_a))
  -
  sum(stride_b[i] * x_b[i] for i in range(ndim_b))
  ==
  base_b - base_a

  0 <= x_a[i] < shape_a[i]
  0 <= x_b[i] < shape_b[i]

for some integer x_a, x_b. Itemsize needs to be considered as an additional dimension with stride 1 and size itemsize.

Negative strides can be changed to positive (and vice versa) by changing variables x[i] -> shape[i] - 1 - x[I], and zero strides can be dropped, so that the problem can be recast into a bounded Diophantine equation with positive coefficients::

 sum(a[i] * x[i] for i in range(n)) == b

 a[i] > 0

 0 <= x[i] <= ub[i]

This problem is NP-hard --- runtime of algorithms grows exponentially with increasing ndim.


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接