使用并行方法查找质数

3

我创建了下面的方法来找到小于某个数的所有质数。有没有建议如何加快速度?

我这样调用它:

interval = (value + NOOFTHREADS - 1) / NOOFTHREADS;
int max = interval * NOOFTHREADS;

tickets = new List<int>(NOOFTHREADS);
for (int i = 1; i <= NOOFTHREADS; i++)
{
    tickets.Add(i * (max / NOOFTHREADS));
}

Enumerable.Range(1, NOOFTHREADS)
.AsParallel()
.ForAll(_ => findPrimes());

使用一些全局变量;

static List<int> vals = new List<int>();
static List<int> tickets;
static int interval = new int();

方法如下:

public static void findPrimes()
    {
        int myTicket;
        lock (tickets)
        {
            myTicket = (int)tickets.Last();
            tickets.RemoveAt(tickets.Count - 1);
        }
        var max = myTicket;
        int min = max - interval +1;
        int num;

        var maxSquareRoot = Math.Sqrt(max);
        var eliminated = new System.Collections.BitArray(max + 1);
        eliminated[0] = true;
        eliminated[1] = true;
        for (int i = 2; i < (max) / 2; i++)
        {
            if (!eliminated[i])
            {
                if (i < maxSquareRoot)
                {
                    num = ((min + i -1 )/i)*i;

                    if (num == i)
                        num = num + i;

                    for (int j =num; j <= max; j += i)
                        eliminated[j] = true;
                }
            }
        }
        for (int b = (int)min; b < max; b++)
        {
            if (!eliminated[b])
                lock(vals)
                    vals.Add(b);
        }
    }

如果您的编辑意味着您想要进一步加快速度,那么这应该是一个新的问题。然而,正常的步骤是1.从筛子中消除偶数,这是大约2的因素,也很简单;2. 还要消除3的倍数,这是大约1.5的进一步因素,仍然相对容易; 3. 消除更小的质数的倍数,随着回报逐渐减少而越来越困难-或者使用由其他人认真调整的库,例如Kim Walisch的primeSieve(C ++)或Daniel Bernstein's primegen(C)。 - Daniel Fischer
4个回答

5

埃氏筛法可以很容易地并行化,只需将其分成单独的块并分别筛选每个块即可。您已经开始了这种拆分,但还没有进一步获得良好的结果。让我们看看 findPrimes() 中存在哪些问题。

var max = myTicket;
int min = max - interval +1;
int num;

var maxSquareRoot = Math.Sqrt(max);
var eliminated = new System.Collections.BitArray(max + 1);

您需要为每个线程创建一个新的BitArray,覆盖从0到max的所有数字。对于筛选第一块的线程来说,这很好,但对于后续线程来说,您分配了比所需更多的内存。当上限很高且有许多线程时,这本身就是一个问题,您正在分配大约(NOOFTHREADS + 1) * limit / 2个位,其中只需要大约limit个位。对于较少的线程和/或较低的限制,您仍会降低局部性,并且将拥有更多的缓存未命中。
eliminated[0] = true;
eliminated[1] = true;
for (int i = 2; i < (max) / 2; i++)

i > maxSquareRoot时,您应该停止外部循环。然后,循环体不再执行任何有意义的操作,只执行一次读取和一两个检查。每次迭代花费的时间并不长,但如果max为1011,对于从√maxmax的所有i都要这样做,那么总时间就会增加。仅对最后一个块执行此操作可能比单线程单块筛法还要耗时。
{
    if (!eliminated[i])
eliminated[i] 只有在 i >= min (或 i < 2) 时才可能为真,这种情况只会在第一部分中出现,对于 i <= maxSquareRoot 的情况(除非限制非常低)。因此,在其他部分中,您还将消除4、6、8、9、10、12、14等的倍数。这是很多浪费的工作。
    {
        if (i < maxSquareRoot)

如果maxSquareRoot恰好是质数,则不应该消除其平方,比较应该为<=
        {
            num = ((min + i -1 )/i)*i;

            if (num == i)
                num = num + i;

            for (int j =num; j <= max; j += i)
                eliminated[j] = true;
        }
    }
}

现在,当筛选完成后,您可以遍历 BitArray 的块。

for (int b = (int)min; b < max; b++)
{
    if (!eliminated[b])
        lock(vals)
            vals.Add(b);
}

每当您找到一个质数时,您就会锁定列表vals并将该质数添加到其中。如果有两个或更多的线程在筛选过程中几乎同时完成,则它们会相互干扰,锁定和等待会进一步减慢进程速度。
为了减少空间使用,每个线程应该创建一个质数列表,直到maxSquareRoot,并使用该列表来消除其块中的合成数,以便BitArray只需要max-min+1位。每个线程创建自己的列表会重复一些工作,但由于上限很小,这不是太多的额外工作。我不知道如何处理并发读取访问,如果这不会增加同步开销,您也可以为所有线程仅使用一个列表,但我怀疑这不会带来任何好处。
代码大致如下:
List<int> sievePrimes = simpleSieve(maxSquareRoot);
// simpleSieve is a standard SoE returning a list of primes not exceeding its argument
var sieve = new System.Collections.BitArray(max - min + 1);
int minSquareRoot = (int)Math.Sqrt(min);
foreach(int p in sievePrimes)
{
    int num = p > minSquareRoot ? p*p : ((min + p - 1)/p)*p;
    num -= min;
    for(; num <= max-min; num += p)
    {
        sieve[num] =true;
    }
}

现在,为了避免线程在添加素数到列表时互相干扰,每个线程应该创建自己的素数列表,并在一步中将其附加到列表上(我不确定这是否比使用自己的锁添加每个素数更快,但如果不是这样的话,我会感到惊讶)。
List<int> primes = new List<int>();
for(int offset = 0; offset <= max-min; ++offset)
{
    if (!sieve[offset])
    {
        primes.Add(min + offset);
    }
}
lock(vals) vals.AddRange(primes);

(而vals应该创建为预期质数数量的初始容量,以避免每个块重新分配空间)

我在SO上得到的最佳答案。花了我一段时间才理解所有内容,但你的解释非常好。谢谢。如果我有一个疑问,那就是你的方法似乎将1作为质数返回。这是故意的吗?在大多数情况下,我认为1被忽略了。 - windowsgm
不好意思,那是我的疏忽。我习惯于从5或7开始筛选,所以对于1我不需要特别注意。 - Daniel Fischer

0

实现那个教科书上的可读平方根算法有什么原因吗? 看一下质数算法


4
这应该是一条评论,而不是真正回答你的问题。 - Ярослав Рахматуллин
埃拉托色尼筛法不容易并行化。 - spender
你的链接似乎与我正在做的事情不相差太远。虽然我不懂C语言。我该如何将那个C语言示例应用到我的代码中呢? - windowsgm
spender说得很有道理。peoro建议的算法是从2顺序执行到一个设定的限制。没有简单的方法可以在不同位置同时启动多个线程。就我所知,该算法具有O(n)复杂度,因此似乎是一个好东西。你不需要c语言来实现它。我的回答很愚蠢,我无法真正帮助你解决问题,抱歉。 - Ярослав Рахматуллин

0

你是说(根据你提供的第二个链接),我的结果可能不够准确吗? - windowsgm
这取决于你如何测试串行循环和并行循环的性能。如果你使用.NET的Stopwatch类进行测试,结果可能不完全准确。为了获得最佳精度,你需要通过分析器运行你的代码。在所有情况下,并行循环并不总是适用,有时串行循环会更快! - Matthew Layton
作为一个想法,您可以使用计时器,并多次迭代您的代码,将所有结果相加,然后除以迭代次数,这将给您一个大致平均执行时间。首次运行不理想的原因是,第一次运行,您的代码会被 JIT 编译,因此需要补偿 JIT 开销。 - Matthew Layton

0

我认为你所有的锁可能是一个问题,你应该尽量避免使用锁,它们非常昂贵。我不知道你算法的细节,但我认为你应该设法去除锁。门票可以作为输入吗?它们可以有自己的输出队列,在所有操作完成后再进行合并。


我在想锁可能是问题所在。不过我想不到绕过它们的方法。当所有线程都开始相同的方法时,如何为每个线程的方法输入不同的数据? - windowsgm
据我所知,您将线程数作为输入,然后将其丢弃。相反,您应该在票据列表上调用.AsParallel()。我认为固定数量的线程不适合 PLINQ。 - Onkelborg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接