C#: 实现Atkin筛法

8

我想知道这里是否有人有一个好的Atkin筛法实现,他们想分享一下。

我正在尝试实现它,但还不能完全理解它。以下是我目前的代码。

public class Atkin : IEnumerable<ulong>
{
    private readonly List<ulong> primes;
    private readonly ulong limit;

    public Atkin(ulong limit)
    {
        this.limit = limit;
        primes = new List<ulong>();
    }

    private void FindPrimes()
    {
        var isPrime = new bool[limit + 1];
        var sqrt = Math.Sqrt(limit);

        for (ulong x = 1; x <= sqrt; x++)
            for (ulong y = 1; y <= sqrt; y++)
            {
                var n = 4*x*x + y*y;
                if (n <= limit && (n % 12 == 1 || n % 12 == 5))
                    isPrime[n] ^= true;

                n = 3*x*x + y*y;
                if (n <= limit && n % 12 == 7)
                    isPrime[n] ^= true;

                n = 3*x*x - y*y;
                if (x > y && n <= limit && n % 12 == 11)
                    isPrime[n] ^= true;
            }

        for (ulong n = 5; n <= sqrt; n++)
            if (isPrime[n])
                for (ulong k = n*n; k <= limit; k *= k)
                    isPrime[k] = false;

        primes.Add(2);
        primes.Add(3);
        for (ulong n = 5; n <= limit; n++)
            if (isPrime[n])
                primes.Add(n);
    }


    public IEnumerator<ulong> GetEnumerator()
    {
        if (!primes.Any())
            FindPrimes();


        foreach (var p in primes)
            yield return p;
    }


    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

我基本上只是试图“翻译”维基百科上列出的伪代码,但它没有正常工作。所以要么我误解了什么,要么就是做错了什么。或者最有可能是两者都有...
我有一个前500个质数的列表用作测试,我的实现在第40(或41?)个数字处失败了。

值在索引[40]处不同
预期:179
实际上是:175

你能找到我的错误吗?你有一个可以分享的实现吗?还是两者都有?
我正在使用的确切测试看起来像这样:

public abstract class AtkinTests
{
    [Test]
    public void GetEnumerator_FirstFiveHundredNumbers_AreCorrect()
    {
        var sequence = new Atkin(2000000);
        var actual = sequence.Take(500).ToArray();
        var expected = First500;

        CollectionAssert.AreEqual(expected, actual);
    }

    private static readonly ulong[] First500 = new ulong[]
        {
            2, 3, 5, 7, 11, 13, 17, ...
        };
}

1
我已经实现了一个比你的快一点,在多核上更快的版本,请参见http://alicebobandmallory.com/articles/2010/01/14/prime-factorization-in-parallel。 - Jonas Elfström
6个回答

11

这段代码:

for (ulong k = n*n; k <= limit; k *= k)
  isPrime[k] = false;

看起来这个伪代码的翻译并不是很准确:

is_prime(k) ← false, k ∈ {n², 2n², 3n², ..., limit}

你的代码看起来会运行n * n,n ^ 4,n ^ 8等操作,即每次平方而不是每次增加n的平方。请尝试以下方法:

ulong nSquared = n * n;
for (ulong k = nSquared; k <= limit; k += nSquared)
  isPrime[k] = false;

太棒了!看起来就是这样了...我其实对那个具体的部分有点不确定,只是不知道怎么正确地写出来 :P - Svish

6
Aaron Mugatroyd的最后一个答案提供了Sieve of Atkin(SoA)的Python源代码翻译,但它可以在以下几个方面进行改进,因为它错过了一些重要的优化:
  1. 他的答案没有使用完整的模60原始Atkin和Bernstein版本的筛子,而是使用了维基百科文章中略微改进的伪代码的变体,因此使用了大约0.36倍的数字筛选范围组合切换/削减操作;下面的我的代码使用相当高效的非页面段伪代码,如我在评论Sieve of Atkin时的回答中所述,它使用数字范围的约0.26倍来减少工作量,使工作量减少了约2/7。
  2. 他的代码通过仅具有奇数表示来减小缓冲区大小,而我的代码进一步位打包以消除任何可被三和五整除的数字表示以及暗示为“仅奇数”的可被二整除的数字表示;这将内存要求进一步减少了近一半(到8/15),并帮助更好地利用CPU缓存,从而降低平均内存访问时间,进一步增加速度。
  3. 我的代码使用快速查找表(LUT)pop count技术计算素数的数量,需要几乎没有时间来计算,而他使用的位逐位技术大约需要一秒钟;但是,在此示例代码中,即使这段小时间也被从计时部分中删除。
  4. 最后,我的代码对位操作进行了优化,以获得最少的内部循环代码。例如,它不使用连续的右移一来生成奇数表示索引,并且实际上几乎不使用位移位通过将所有内部循环编写为常量模数(等于位位置)操作。同样,Aaron的翻译代码在操作方面相当低效,例如在素数平方自由削减中,它将素数的平方添加到索引中,然后检查奇数结果,而不仅仅是两倍的平方,因此不需要进行检查;然后,在内部循环中执行cull操作之前,将数字向右移动一位(除以二),就像在所有循环中一样,使得甚至检查都变得多余。这种低效的代码在使用这种“大筛选缓冲区数组”技术的大范围内执行时间不会产生太大影响,因为每个操作的大部分时间用于RAM内存访问(对于十亿范围而言约为37个CPU时钟周期或更多),但对于适合CPU缓存的较小范围,执行时间将比必要的慢得多;换句话说,它设置了每个操作的太高的最低限制。
代码如下:
//Sieve of Atkin based on full non page segmented modulo 60 implementation...

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;

namespace NonPagedSoA {
  //implements the non-paged Sieve of Atkin (full modulo 60 version)...
  class SoA : IEnumerable<ulong> {
    private ushort[] buf = null;
    private long cnt = 0;
    private long opcnt = 0;
    private static byte[] modPRMS = { 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 49, 53, 59, 61 };
    private static ushort[] modLUT;
    private static byte[] cntLUT;
    //initialize the private LUT's...
    static SoA() {
      modLUT = new ushort[60];
      for (int i = 0, m = 0; i < modLUT.Length; ++i) {
        if ((i & 1) != 0 || (i + 7) % 3 == 0 || (i + 7) % 5 == 0) modLUT[i] = 0;
        else modLUT[i] = (ushort)(1 << (m++));
      }
      cntLUT = new byte[65536];
      for (int i = 0; i < cntLUT.Length; ++i) {
        var c = 0;
        for (int j = i; j > 0; j >>= 1) c += j & 1;
        cntLUT[i] = (byte)c;
      }
    }
    //initialization and all the work producing the prime bit array done in the constructor...
    public SoA(ulong range) {
      this.opcnt = 0;
      if (range < 7) {
        if (range > 1) {
          cnt = 1;
          if (range > 2) this.cnt += (long)(range - 1) / 2;
        }
        this.buf = new ushort[0];
      }
      else {
        this.cnt = 3;
        var nrng = range - 7; var lmtw = nrng / 60;
        //initialize sufficient wheels to non-prime
        this.buf = new ushort[lmtw + 1];

        //Put in candidate primes:
        //for the 4 * x ^ 2 + y ^ 2 quadratic solution toggles - all x odd y...
        ulong n = 6; // equivalent to 13 - 7 = 6...
        for (uint x = 1, y = 3; n <= nrng; n += (x << 3) + 4, ++x, y = 1) {
          var cb = n; if (x <= 1) n -= 8; //cancel the effect of skipping the first one...
          for (uint i = 0; i < 15 && cb <= range; cb += (y << 2) + 4, y += 2, ++i) {
            var cbd = cb / 60; var cm = modLUT[cb % 60];
            if (cm != 0)
              for (uint c = (uint)cbd, my = y + 15; c < buf.Length; c += my, my += 30) {
                buf[c] ^= cm; // ++this.opcnt;
              }
          }
        }
        //for the 3 * x ^ 2 + y ^ 2 quadratic solution toggles - x odd y even...
        n = 0; // equivalent to 7 - 7 = 0...
        for (uint x = 1, y = 2; n <= nrng; n += ((x + x + x) << 2) + 12, x += 2, y = 2) {
          var cb = n;
          for (var i = 0; i < 15 && cb <= range; cb += (y << 2) + 4, y += 2, ++i) {
            var cbd = cb / 60; var cm = modLUT[cb % 60];
            if (cm != 0)
              for (uint c = (uint)cbd, my = y + 15; c < buf.Length; c += my, my += 30) {
                buf[c] ^= cm; // ++this.opcnt;
              }
          }
        }
        //for the 3 * x ^ 2 - y ^ 2 quadratic solution toggles all x and opposite y = x - 1...
        n = 4; // equivalent to 11 - 7 = 4...
        for (uint x = 2, y = x - 1; n <= nrng; n += (ulong)(x << 2) + 4, y = x, ++x) {
          var cb = n; int i = 0;
          for ( ; y > 1 && i < 15 && cb <= nrng; cb += (ulong)(y << 2) - 4, y -= 2, ++i) {
            var cbd = cb / 60; var cm = modLUT[cb % 60];
            if (cm != 0) {
              uint c = (uint)cbd, my = y;
              for ( ; my >= 30 && c < buf.Length; c += my - 15, my -= 30) {
                buf[c] ^= cm; // ++this.opcnt;
              }
              if (my > 0 && c < buf.Length) { buf[c] ^= cm; /* ++this.opcnt; */ }
            }
          }
          if (y == 1 && i < 15) {
            var cbd = cb / 60; var cm = modLUT[cb % 60];
            if ((cm & 0x4822) != 0 && cbd < (ulong)buf.Length) { buf[cbd] ^= cm; /* ++this.opcnt; */ }
          }
        }

        //Eliminate squares of base primes, only for those on the wheel:
        for (uint i = 0, w = 0, pd = 0, pn = 0, msk = 1; w < this.buf.Length ; ++i) {
          uint p = pd + modPRMS[pn];
          ulong sqr = (ulong)p * (ulong)p; //to handle ranges above UInt32.MaxValue
          if (sqr > range) break;
          if ((this.buf[w] & msk) != 0) { //found base prime, square free it...
            ulong s = sqr - 7;
            for (int j = 0; s <= nrng && j < modPRMS.Length; s = sqr * modPRMS[j] - 7, ++j) {
              var cd = s / 60; var cm = (ushort)(modLUT[s % 60] ^ 0xFFFF);
              //may need ulong loop index for ranges larger than two billion
              //but buf length only good to about 2^31 * 60 = 120 million anyway,
              //even with large array setting and half that with 32-bit...
              for (ulong c = cd; c < (ulong)this.buf.Length; c += sqr) {
                this.buf[c] &= cm; // ++this.opcnt;
              }
            }
          }
          if (msk >= 0x8000) { msk = 1; pn = 0; ++w; pd += 60; }
          else { msk <<= 1; ++pn; }
        }

        //clear any overflow primes in the excess space in the last wheel/word:
        var ndx = nrng % 60; //clear any primes beyond the range
        for (; modLUT[ndx] == 0; --ndx) ;
        this.buf[lmtw] &= (ushort)((modLUT[ndx] << 1) - 1);
      }
    }

    //uses a fast pop count Look Up Table to return the total number of primes...
    public long Count {
      get {
        long cnt = this.cnt;
        for (int i = 0; i < this.buf.Length; ++i) cnt += cntLUT[this.buf[i]];
        return cnt;
      }
    }

    //returns the number of toggle/cull operations used to sieve the prime bit array...
    public long Ops {
      get {
        return this.opcnt;
      }
    }

    //generate the enumeration of primes...
    public IEnumerator<ulong> GetEnumerator() {
      yield return 2; yield return 3; yield return 5;
      ulong pd = 0;
      for (uint i = 0, w = 0, pn = 0, msk = 1; w < this.buf.Length; ++i) {
        if ((this.buf[w] & msk) != 0) //found a prime bit...
          yield return pd + modPRMS[pn]; //add it to the list
        if (msk >= 0x8000) { msk = 1; pn = 0; ++w; pd += 60; }
        else { msk <<= 1; ++pn; }
      }
    }

    //required for the above enumeration...
    IEnumerator IEnumerable.GetEnumerator() {
      return this.GetEnumerator();
    }
  }

  class Program {
    static void Main(string[] args) {
      Console.WriteLine("This program calculates primes by a simple full version of the Sieve of Atkin.\r\n");

      const ulong n = 1000000000;

      var elpsd = -DateTime.Now.Ticks;

      var gen = new SoA(n);

      elpsd += DateTime.Now.Ticks;

      Console.WriteLine("{0} primes found to {1} using {2} operations in {3} milliseconds.", gen.Count, n, gen.Ops, elpsd / 10000);

      //Output prime list for testing...
      //Console.WriteLine();
      //foreach (var p in gen) {
      //  Console.Write(p + " ");
      //}
      //Console.WriteLine();

//Test options showing what one can do with the enumeration, although more slowly...
//      Console.WriteLine("\r\nThere are {0} primes with the last one {1} and the sum {2}.",gen.Count(),gen.Last(),gen.Sum(x => (long)x));

      Console.Write("\r\nPress any key to exit:");
      Console.ReadKey(true);
      Console.WriteLine();
    }
  }
}

这段代码的速度比Aaron的代码快了大约两倍(在i7-2700K(3.5 GHz)上以64位或32位模式运行,缓冲区大小约为16.5兆字节,并且在筛选范围为十亿时进行了约0.258十亿次的组合开关/质数平方自由的剔除操作(取消对“++this.opcnt”语句的注释可查看),相比之下,如果不计算时间,他的代码需要5.4 / 6.2秒(32位/ 64位),使用了近似0.359十亿次的组合开关/剔除操作,用于筛选最多达到十亿的数),但这并不意味着Atkin筛法比Eratosthenes筛法更快,因为如果将上述SoA实现中使用的技术应用到使用最大轮因子分解的SoE中,SoE的速度将与此相同。

分析:虽然完全优化过的SoE和SoA的操作数量在筛选范围为十亿时大致相同,但是对于这些未分页的实现,主要瓶颈是在筛选缓冲区大小超过CPU缓存大小(我的i7具有32 KiloBytes L1高速缓存、256 Kilobytes L2高速缓存和8 Megabytes L3高速缓存,分别需要1、4和20个时钟周期的时间访问)后的内存访问。此时,内存访问可能超过100个时钟周期。

当将算法适应于分页分段时,两种方法的内存访问速度都有约8倍的改进。然而,随着筛选范围变得非常大,SoE仍然比SoA更优越,因为由于Culling扫描中快速增加到许多倍页面缓冲区大小的概率极小的数质数自由部分的实现难度,所以难以实现该算法。另外,也更严重的是,对于每个x值的新起始点计算与每个页面缓冲区最低表示的y值相比,Paged SoA的效率损失很大,随着筛选范围的增长。

EDIT_ADD: 在Aaron Murgatroyd使用的仅限于奇数的SoE中,对于一亿个筛子范围,使用了大约10.26亿次筛选操作,所以比SoA多进行了四倍的操作,因此应该运行得慢四倍。但是,即使在这里实现的SoA有一个更复杂的内部循环,尤其是由于仅用于奇数的SoE筛除中具有较高比例的步幅比SoA短得多,因此尽管筛子缓冲区远远超过CPU缓存大小(更好地使用缓存关联性),但天真的仅限于奇数的SoE具有更好的平均内存访问时间。这就解释了为什么上面的SoA只比仅限于奇数的SoE快两倍,尽管理论上看起来只执行了四分之一的工作。

如果使用与上述SoA相同的常数取模内部循环的类似算法,并实现相同的2/3/5轮因数分解,则SoE将减少约0.405亿个操作,因此仅比SoA多进行50%的操作,并且理论上略慢于SoA,但可能由于筛除步幅仍然比平均值小一些而以与SoA几乎相同的速度运行,使用这种“天真的”大型内存缓冲区。将轮因数分解增加到2/3/5/7轮意味着对于10亿个筛子范围,SoE的筛选操作减少到约0.314,可能使该版本的SoE在此算法中以相同的速度运行。

可以通过预筛选2/3/5/7/11/13/17/19素数因子的筛子数组(复制模式)来进一步使用轮因数分解,几乎不会增加执行时间成本,从而将总筛选操作数量降至10亿个筛子范围下的大约0.251亿,并且即使对于这些大型内存缓冲区版本,SoE仍然比上述优化版本的SoA具有更少的代码复杂度,速度更快或者与之相当。

因此,可以看出,与天真的或仅限于奇数的或2/3/5轮因数分解版本相比,SoE的操作次数可以大大减少,使操作次数与SoA相同,同时由于较简单的内部循环和更高效的内存访问,每个操作的时间实际上可能更短。END_EDIT_ADD

EDIT_ADD2: 我在这里添加了一个使用类似于上面SoA伪代码中链接答案所示的常数模/bit位置技术的SoE代码。 尽管具有高形式分解和预处理,使得实际被削减的操作总数实际上少于两十亿个素数范围内的组合切换/削减操作,但该代码比上述SoA要简单得多。 以下是该代码:

EDIT_FINAL 下面是已校正的代码及相关注释 END_EDIT_FINAL

//Sieve of Eratosthenes based on maximum wheel factorization and pre-culling implementation...

using System;
using System.Collections;
using System.Collections.Generic;

namespace NonPagedSoE {
  //implements the non-paged Sieve of Eratosthenes (full modulo 210 version with preculling)...
  class SoE : IEnumerable<ulong> {
    private ushort[] buf = null;
    private long cnt = 0;
    private long opcnt = 0;
    private static byte[] basePRMS = { 2, 3, 5, 7, 11, 13, 17, 19 };
    private static byte[] modPRMS = { 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, //positions + 23
                                      97, 101, 103, 107, 109, 113, 121, 127, 131, 137, 139, 143, 149, 151, 157, 163,
                                      167, 169, 173, 179, 181 ,187 ,191 ,193, 197, 199, 209, 211, 221, 223, 227, 229 };
    private static byte[] gapsPRMS = { 6, 2, 6, 4, 2, 4, 6, 6, 2, 6, 4, 2, 6, 4, 6, 8,
                                       4, 2, 4, 2, 4, 8, 6, 4, 6, 2, 4, 6, 2, 6, 6, 4,
                                       2, 4, 6, 2, 6, 4, 2, 4, 2, 10, 2, 10, 2, 4, 2, 4 };
    private static ulong[] modLUT;
    private static byte[] cntLUT;
    //initialize the private LUT's...
    static SoE() {
      modLUT = new ulong[210];
      for (int i = 0, m = 0; i < modLUT.Length; ++i) {
        if ((i & 1) != 0 || (i + 23) % 3 == 0 || (i + 23) % 5 == 0 || (i + 23) % 7 == 0) modLUT[i] = 0;
        else modLUT[i] = 1UL << (m++);
      }
      cntLUT = new byte[65536];
      for (int i = 0; i < cntLUT.Length; ++i) {
        var c = 0;
        for (int j = i ^ 0xFFFF; j > 0; j >>= 1) c += j & 1; //reverse logic; 0 is prime; 1 is composite
        cntLUT[i] = (byte)c;
      }
    }
    //initialization and all the work producing the prime bit array done in the constructor...
    public SoE(ulong range) {
      this.opcnt = 0;
      if (range < 23) {
        if (range > 1) {
          for (int i = 0; i < modPRMS.Length; ++i) if (modPRMS[i] <= range) this.cnt++; else break;
        }
        this.buf = new ushort[0];
      }
      else {
        this.cnt = 8;
        var nrng = range - 23; var lmtw = nrng / 210; var lmtwt3 = lmtw * 3; 
        //initialize sufficient wheels to prime
        this.buf = new ushort[lmtwt3 + 3]; //initial state of all zero's is all potential prime.

        //initialize array to account for preculling the primes of 11, 13, 17, and 19;
        //(2, 3, 5, and 7 already eliminated by the bit packing to residues).
        for (int pn = modPRMS.Length - 4; pn < modPRMS.Length; ++pn) {
          uint p = modPRMS[pn] - 210u; ulong pt3 = p * 3;
          ulong s = p * p - 23;
          ulong xrng = Math.Min(9699709, nrng); // only do for the repeating master pattern size
          ulong nwrds = (ulong)Math.Min(138567, this.buf.Length);
          for (int j = 0; s <= xrng && j < modPRMS.Length; s += p * gapsPRMS[(pn + j++) % 48]) {
            var sm = modLUT[s % 210];
            var si = (sm < (1UL << 16)) ? 0UL : ((sm < (1UL << 32)) ? 1UL : 2UL);
            var cd = s / 210 * 3 + si; var cm = (ushort)(sm >> (int)(si << 4));
            for (ulong c = cd; c < nwrds; c += pt3) { //tight culling loop for size of master pattern
              this.buf[c] |= cm; // ++this.opcnt; //reverse logic; mark composites with ones.
            }
          }
        }
        //Now copy the master pattern so it repeats across the main buffer, allow for overflow...
        for (long i = 138567; i < this.buf.Length; i += 138567)
          if (i + 138567 <= this.buf.Length)
            Array.Copy(this.buf, 0, this.buf, i, 138567);
          else Array.Copy(this.buf, 0, this.buf, i, this.buf.Length - i);

        //Eliminate all composites which are factors of base primes, only for those on the wheel:
        for (uint i = 0, w = 0, wi = 0, pd = 0, pn = 0, msk = 1; w < this.buf.Length; ++i) {
          uint p = pd + modPRMS[pn];
          ulong sqr = (ulong)p * (ulong)p;
          if (sqr > range) break;
          if ((this.buf[w] & msk) == 0) { //found base prime, mark its composites...
            ulong s = sqr - 23; ulong pt3 = p * 3;
            for (int j = 0; s <= nrng && j < modPRMS.Length; s += p * gapsPRMS[(pn + j++) % 48]) {
              var sm = modLUT[s % 210];
              var si = (sm < (1UL << 16)) ? 0UL : ((sm < (1UL << 32)) ? 1UL : 2UL);
              var cd = s / 210 * 3 + si; var cm = (ushort)(sm >> (int)(si << 4));
              for (ulong c = cd; c < (ulong)this.buf.Length; c += pt3) { //tight culling loop
                this.buf[c] |= cm; // ++this.opcnt; //reverse logic; mark composites with ones.
              }
            }
          }
          ++pn;
          if (msk >= 0x8000) { msk = 1; ++w; ++wi; if (wi == 3) { wi = 0; pn = 0; pd += 210; } }
          else msk <<= 1;
        }

        //clear any overflow primes in the excess space in the last wheel/word:
        var ndx = nrng % 210; //clear any primes beyond the range
        for (; modLUT[ndx] == 0; --ndx) ;
        var cmsk = (~(modLUT[ndx] - 1)) << 1; //force all bits above to be composite ones.
        this.buf[lmtwt3++] |= (ushort)cmsk;
        this.buf[lmtwt3++] |= (ushort)(cmsk >> 16);
        this.buf[lmtwt3] |= (ushort)(cmsk >> 32);
      }
    }

    //uses a fast pop count Look Up Table to return the total number of primes...
    public long Count {
      get {
        long cnt = this.cnt;
        for (int i = 0; i < this.buf.Length; ++i) cnt += cntLUT[this.buf[i]];
        return cnt;
      }
    }

    //returns the number of cull operations used to sieve the prime bit array...
    public long Ops {
      get {
        return this.opcnt;
      }
    }

    //generate the enumeration of primes...
    public IEnumerator<ulong> GetEnumerator() {
      yield return 2; yield return 3; yield return 5; yield return 7;
      yield return 11; yield return 13; yield return 17; yield return 19;
      ulong pd = 0;
      for (uint i = 0, w = 0, wi = 0, pn = 0, msk = 1; w < this.buf.Length; ++i) {
        if ((this.buf[w] & msk) == 0) //found a prime bit...
          yield return pd + modPRMS[pn];
        ++pn;
        if (msk >= 0x8000) { msk = 1; ++w; ++wi; if (wi == 3) { wi = 0; pn = 0; pd += 210; } }
        else msk <<= 1;
      }
    }

    //required for the above enumeration...
    IEnumerator IEnumerable.GetEnumerator() {
      return this.GetEnumerator();
    }
  }

  class Program {
    static void Main(string[] args) {
      Console.WriteLine("This program calculates primes by a simple maximually wheel factorized version of the Sieve of Eratosthenes.\r\n");

      const ulong n = 1000000000;

      var elpsd = -DateTime.Now.Ticks;

      var gen = new SoE(n);

      elpsd += DateTime.Now.Ticks;

      Console.WriteLine("{0} primes found to {1} using {2} operations in {3} milliseconds.", gen.Count, n, gen.Ops, elpsd / 10000);

//      Console.WriteLine();
//      foreach (var p in gen) {
//        Console.Write(p + " ");
//      }
//      Console.WriteLine();

      //      Console.WriteLine("\r\nThere are {0} primes with the last one {1} and the sum {2}.",gen.Count(),gen.Last(),gen.Sum(x => (long)x));

      Console.Write("\r\nPress any key to exit:");
      Console.ReadKey(true);
      Console.WriteLine();
    }
  }
}

这段代码实际上比上面的结构体数组运行速度快几个百分点,因为操作略少,对于十亿级别的大数组来说,主要瓶颈是内存访问时间,约为40到100个CPU时钟周期,具体取决于CPU和内存规格。这意味着代码优化(除了减少总操作数之外)是无效的,因为大部分时间都花在等待内存访问上。总之,使用巨大的内存缓冲区不是筛选大范围的最有效方法,使用页面分割可以将SoE的因子提高多达8倍,同时使用相同的最大轮因子分解(也为多处理铺平道路)。

在实现页面分割和多处理方面,与SoE相比,当范围远高于40亿时,SoA确实存在缺陷,因为由于素数平方自由处理和计算更多的页面起始地址相关的页面处理开销因素,SoA的渐近复杂度降低所带来的任何收益都会迅速消耗殆尽;或者,通过以巨大的内存消耗为代价,在RAM内存中存储标记,并进一步降低访问这些标记存储结构的效率来克服这一问题。

简而言之,与完全轮因子分解的SoE相比,SoA并不是一个实用的筛法,因为随着渐近复杂度的提高,它的性能越来越接近于完全优化的SoE,但由于实际实现的细节,如相对内存访问时间和页面分割复杂性以及一般更为复杂和难写,它开始失去效率。在我看来,它更像是一个有趣的智力概念和思维锻炼,而不是实用的筛法。

总有一天,我会将这些技术适应于多线程页面分割的埃拉托色尼筛法,使其在C#中的速度与Atkin和Bernstein的“primegen” SoA实现相当,并在超过40亿的大范围内甚至单线程时也能获得额外的速度提升,而在我的i7上进行多线程处理时可获得多达4倍的速度提升(包括超线程的八个核心)。


感谢您进行了彻底的分析工作并撰写出了高质量的文章;同时也要赞扬您在辛勤工作之后认识到,由于相同的实现工作下,埃拉托色尼筛法在实践中始终比阿特金筛法更快。非分段筛法有点误导人,因为分段筛法要快得多。为了让埃拉托色尼筛法真正发挥速度优势,在循环体中展开一个完整的轮(以获得常数索引乘数和位位置)。这意味着每个8个模30余数类别都需要一个循环体。 - DarthGizka
1
@DarthGizka,谢谢,你可能会对我的C#多线程文章感兴趣:https://dev59.com/ClPTa4cB1Zd3GeqPjGxM#18885065,在那里我将分段SoE推向了更远的地方。我正在开发一个多线程版本,使用展开余数(实际上是48 mod 210)和预筛选,速度比该文章快两到四倍(取决于CPU)。不幸的是,我还没有找到时间完成这篇文章,但很快就会了。它确实进一步表明,实际上,当使用最大轮因子分解时,特别是在分段时,SoE总是比SoA更快。 - GordonBGood
谢谢,这看起来确实非常有趣!我还没有开始高阶序列的工作,因为我想直接(在运行时)生成IL,而不是像C++和FoxPro那样使用源代码生成。"入乡随俗……"就是这样。 IL角度来自我正在进行的一些研究,旨在将一个FoxPro程序移植到C#,其中涉及解释从大量数据库记录中提取的公式。在C#中,这意味着要么使用表达式树,要么使用IL,但只有IL才是真正的竞争者,因为移植应该增加吞吐量而不是减少它…;-) - DarthGizka
1
@DarthGizka,我已将最终代码从SoA更改为SoE。如注所述,对于这些十亿级别的大筛选范围,巨大的筛选缓冲区数组(甚至是位压缩和轮因子减少)比CPU缓存要大得多,因此该算法受到主内存访问时间的瓶颈限制,这可能超过100个CPU时钟周期,而基本的筛选操作循环时间可以远低于10个(某些CPU甚至只需约5.5个)如果缓冲区大小适合CPU缓存。因此,使用分段版本进行十亿级别的筛选应该在现代CPU上不到一秒钟。 - GordonBGood
1
@DarthGizka,请注意,我不是为每个模数残留值单独设置循环,而是针对每个基本质数计算每个残留值的起始地址和位偏移量(在我的内部2/3/5/7轮因式分解中循环48次),并在外部循环内运行紧凑的筛选循环,这又位于基本质数确定的循环内。这会花费一些时间来计算每个残留值的起始地址,但由于这种大型数组算法每个基本质数每个残留值只需要进行一次计算,所以这个时间是可以忽略不计的——对于十亿范围,需要进行162,864次计算,大约需要251,000,000次操作。 - GordonBGood
1
@DarthGizka,你说得对,对于非常大的筛选范围,唯一实用的实现方法是使用页面分割,分段SoE更容易实现,并且在处理大范围时运行速度更快。除了由于主内存访问瓶颈而导致的缓慢之外,这些实现的筛选范围限制约为60亿(SoA版本)和75亿(SoE版本),如果启用gcAllowVeryLargeObjects运行时选项,则这些限制大约会增加一倍。 - GordonBGood

3

这里有另一种实现方式。它使用BitArray来节省内存。使用Parallel.For需要.NET Framework 4。

static List<int> FindPrimesBySieveOfAtkins(int max)
{
//  var isPrime = new BitArray((int)max+1, false); 
//  Can't use BitArray because of threading issues.
    var isPrime = new bool[max + 1];
    var sqrt = (int)Math.Sqrt(max);

    Parallel.For(1, sqrt, x =>
    {
        var xx = x * x;
        for (int y = 1; y <= sqrt; y++)
        {
            var yy = y * y;
            var n = 4 * xx + yy;
            if (n <= max && (n % 12 == 1 || n % 12 == 5))
                isPrime[n] ^= true;

            n = 3 * xx + yy;
            if (n <= max && n % 12 == 7)
                isPrime[n] ^= true;

            n = 3 * xx - yy;
            if (x > y && n <= max && n % 12 == 11)
                isPrime[n] ^= true;
        }
    });

    var primes = new List<int>() { 2, 3 };
    for (int n = 5; n <= sqrt; n++)
    {
        if (isPrime[n])
        {
            primes.Add(n);
            int nn = n * n;
            for (int k = nn; k <= max; k += nn)
                isPrime[k] = false;
        }
    }

    for (int n = sqrt + 1; n <= max; n++)
        if (isPrime[n])
            primes.Add(n);

    return primes;
}

乍一看,这看起来非常酷,而且确实相当快,但似乎它不能正常工作。尝试使用FindPrimesBySieveOfAtkins(1000000)。Count,您将获得大约78500的不同值。我想这可能是由于并行性造成的,正如您所看到的那样。 - Tom Chantler
1
你说得完全正确。我对BitArray的非线程安全特性有所担忧,但我认为isPrime[n] ^= true;是一个原子操作,无论翻转位的顺序如何,都可以使用。但事实并非如此。现在已将其更改为布尔数组,似乎能够解决问题,但当然会带来更大的内存开销。 - Jonas Elfström
如果使用http://msdn.microsoft.com/en-us/library/system.threading.interlocked.aspx类或其他类,是否可以使用位数组? - Svish
isPrime[n] ^= true; 不是线程安全的。检查这个的简单方法(但你也可以用bool XOR做到这一点): int i = 0; Parallel.For(0, 10000, (x) => { i += 1; }); Console.WriteLine(i); - oddbear

1
这里是更快的Atkin筛法实现,我从这个Python脚本中窃取了算法(我对算法不做任何归属声明):

http://programmingpraxis.com/2010/02/19/sieve-of-atkin-improved/

using System;
using System.Collections;
using System.Collections.Generic;

namespace PrimeGenerator
{
    // The block element type for the bit array, 
    // use any unsigned value. WARNING: UInt64 is 
    // slower even on x64 architectures.
    using BitArrayType = System.UInt32;

    // This should never be any bigger than 256 bits - leave as is.
    using BitsPerBlockType = System.Byte;

    // The prime data type, this can be any unsigned value, the limit
    // of this type determines the limit of Prime value that can be
    // found. WARNING: UInt64 is slower even on x64 architectures.
    using PrimeType = System.Int32;

    /// <summary>
    /// Calculates prime number using the Sieve of Eratosthenes method.
    /// </summary>
    /// <example>
    /// <code>
    ///     var lpPrimes = new Eratosthenes(1e7);
    ///     foreach (UInt32 luiPrime in lpPrimes)
    ///         Console.WriteLine(luiPrime);
    /// </example>
    public class Atkin : IEnumerable<PrimeType>
    {
        #region Constants

        /// <summary>
        /// Constant for number of bits per block, calculated based on size of BitArrayType.
        /// </summary>
        const BitsPerBlockType cbBitsPerBlock = sizeof(BitArrayType) * 8;

        #endregion

        #region Protected Locals

        /// <summary>
        /// The limit for the maximum prime value to find.
        /// </summary>
        protected readonly PrimeType mpLimit;

        /// <summary>
        /// The number of primes calculated or null if not calculated yet.
        /// </summary>
        protected PrimeType? mpCount = null;

        /// <summary>
        /// The current bit array where a set bit means
        /// the odd value at that location has been determined
        /// to not be prime.
        /// </summary>
        protected BitArrayType[] mbaOddPrime;

        #endregion

        #region Initialisation

        /// <summary>
        /// Create Sieve of Atkin generator.
        /// </summary>
        /// <param name="limit">The limit for the maximum prime value to find.</param>
        public Atkin(PrimeType limit)
        {
            // Check limit range
            if (limit > PrimeType.MaxValue - (PrimeType)Math.Sqrt(PrimeType.MaxValue))
                throw new ArgumentOutOfRangeException();

            mpLimit = limit;

            FindPrimes();
        }

        #endregion

        #region Private Methods

        /// <summary>
        /// Finds the prime number within range.
        /// </summary>
        private unsafe void FindPrimes()
        {
            // Allocate bit array.
            mbaOddPrime = new BitArrayType[(((mpLimit >> 1) + 1) / cbBitsPerBlock) + 1];

            PrimeType lpYLimit, lpN, lpXX3, lpXX4, lpDXX, lpDN, lpDXX4, lpXX, lpX, lpYY, lpMinY, lpS, lpK;

            fixed (BitArrayType* lpbOddPrime = &mbaOddPrime[0])
            {
                // n = 3x^2 + y^2 section
                lpXX3 = 3;
                for (lpDXX = 0; lpDXX < 12 * SQRT((mpLimit - 1) / 3); lpDXX += 24)
                {
                    lpXX3 += lpDXX;
                    lpYLimit = (12 * SQRT(mpLimit - lpXX3)) - 36;
                    lpN = lpXX3 + 16;

                    for (lpDN = -12; lpDN < lpYLimit + 1; lpDN += 72)
                    {
                        lpN += lpDN;
                        lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                            (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                    }

                    lpN = lpXX3 + 4;
                    for (lpDN = 12; lpDN < lpYLimit + 1; lpDN += 72)
                    {
                        lpN += lpDN;
                        lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                            (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                    }
                }

                //    # n = 4x^2 + y^2 section
                lpXX4 = 0;
                for (lpDXX4 = 4; lpDXX4 < 8 * SQRT((mpLimit - 1) / 4) + 4; lpDXX4 += 8)
                {
                    lpXX4 += lpDXX4;
                    lpN = lpXX4 + 1;

                    if ((lpXX4 % 3) != 0)
                    {
                        for (lpDN = 0; lpDN < (4 * SQRT(mpLimit - lpXX4)) - 3; lpDN += 8)
                        {
                            lpN += lpDN;
                            lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                                (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                        }
                    }
                    else
                    {
                        lpYLimit = (12 * SQRT(mpLimit - lpXX4)) - 36;
                        lpN = lpXX4 + 25;

                        for (lpDN = -24; lpDN < lpYLimit + 1; lpDN += 72)
                        {
                            lpN += lpDN;
                            lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                                (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                        }

                        lpN = lpXX4 + 1;
                        for (lpDN = 24; lpDN < lpYLimit + 1; lpDN += 72)
                        {
                            lpN += lpDN;
                            lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                                (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                        }
                    }
                }

                //    # n = 3x^2 - y^2 section
                lpXX = 1;
                for (lpX = 3; lpX < SQRT(mpLimit / 2) + 1; lpX += 2)
                {
                    lpXX += 4 * lpX - 4;
                    lpN = 3 * lpXX;

                    if (lpN > mpLimit)
                    {
                        lpMinY = ((SQRT(lpN - mpLimit) >> 2) << 2);
                        lpYY = lpMinY * lpMinY;
                        lpN -= lpYY;
                        lpS = 4 * lpMinY + 4;
                    }
                    else
                        lpS = 4;

                    for (lpDN = lpS; lpDN < 4 * lpX; lpDN += 8)
                    {
                        lpN -= lpDN;
                        if (lpN <= mpLimit && lpN % 12 == 11)
                            lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] ^= 
                                (BitArrayType)((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock));
                    }
                }

                // xx = 0
                lpXX = 0;
                for (lpX = 2; lpX < SQRT(mpLimit / 2) + 1; lpX += 2)
                {
                    lpXX += 4*lpX - 4;
                    lpN = 3*lpXX;

                    if (lpN > mpLimit)
                    {
                        lpMinY = ((SQRT(lpN - mpLimit) >> 2) << 2) - 1;
                        lpYY = lpMinY * lpMinY;
                        lpN -= lpYY;
                        lpS = 4*lpMinY + 4;
                    }
                    else
                    {
                        lpN -= 1;
                        lpS = 0;
                    }

                    for (lpDN = lpS; lpDN < 4 * lpX; lpDN += 8)
                    {
                        lpN -= lpDN;
                        if (lpN <= mpLimit && lpN % 12 == 11)
                            lpbOddPrime[(lpN>>1) / cbBitsPerBlock] ^= 
                                (BitArrayType)((BitArrayType)1 << (int)((lpN>>1) % cbBitsPerBlock));
                    }
                }

                // # eliminate squares
                for (lpN = 5; lpN < SQRT(mpLimit) + 1; lpN += 2)
                    if ((lpbOddPrime[(lpN >> 1) / cbBitsPerBlock] & ((BitArrayType)1 << (int)((lpN >> 1) % cbBitsPerBlock))) != 0)
                        for (lpK = lpN * lpN; lpK < mpLimit; lpK += lpN * lpN)
                            if ((lpK & 1) == 1)
                                lpbOddPrime[(lpK >> 1) / cbBitsPerBlock] &=
                                    (BitArrayType)~((BitArrayType)1 << (int)((lpK >> 1) % cbBitsPerBlock));
            }
        }

        /// <summary>
        /// Calculates the truncated square root for a number.
        /// </summary>
        /// <param name="value">The value to get the square root for.</param>
        /// <returns>The truncated sqrt of the value.</returns>
        private unsafe PrimeType SQRT(PrimeType value)
        {
            return (PrimeType)Math.Sqrt(value);
        }

        /// <summary>
        /// Gets a bit value by index.
        /// </summary>
        /// <param name="bits">The blocks containing the bits.</param>
        /// <param name="index">The index of the bit.</param>
        /// <returns>True if bit is set, false if cleared.</returns>
        private bool GetBitSafe(BitArrayType[] bits, PrimeType index)
        {
            if ((index & 1) == 1)
                return (bits[(index >> 1) / cbBitsPerBlock] & ((BitArrayType)1 << (int)((index >> 1) % cbBitsPerBlock))) != 0;
            else
                return false;
        }

        #endregion

        #region Public Properties

        /// <summary>
        /// Get the limit for the maximum prime value to find.
        /// </summary>
        public PrimeType Limit
        {
            get
            {
                return mpLimit;
            }
        }

        /// <summary>
        /// Returns the number of primes found in the range.
        /// </summary>
        public PrimeType Count
        {
            get
            {
                if (!mpCount.HasValue)
                {
                    PrimeType lpCount = 0;
                    foreach (PrimeType liPrime in this) lpCount++;
                    mpCount = lpCount;
                }

                return mpCount.Value;
            }
        }

        /// <summary>
        /// Determines if a value in range is prime or not.
        /// </summary>
        /// <param name="test">The value to test for primality.</param>
        /// <returns>True if the value is prime, false otherwise.</returns>
        public bool this[PrimeType test]
        {
            get
            {
                if (test > mpLimit) throw new ArgumentOutOfRangeException();
                if (test <= 1) return false;
                if (test == 2) return true;
                if ((test & 1) == 0) return false;
                return !GetBitSafe(mbaOddPrime, test >> 1);
            }
        }

        #endregion

        #region Public Methods

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns>The enumerator of the primes.</returns>
        public IEnumerator<PrimeType> GetEnumerator()
        {
            //    return [2,3] + filter(primes.__getitem__, xrange(5,limit,2))

            // Two & Three always prime.
            yield return 2;
            yield return 3;

            // Start at first block, third MSB (5).
            int liBlock = 0;
            byte lbBit = 2;
            BitArrayType lbaCurrent = mbaOddPrime[0] >> lbBit;

            // For each value in range stepping in incrments of two for odd values.
            for (PrimeType lpN = 5; lpN <= mpLimit; lpN += 2)
            {
                // If current bit not set then value is prime.
                if ((lbaCurrent & 1) == 1)
                    yield return lpN;

                // Move to NSB.
                lbaCurrent >>= 1;

                // Increment bit value. 
                lbBit++;

                // If block is finished.
                if (lbBit == cbBitsPerBlock) 
                {
                    lbBit = 0;
                    lbaCurrent = mbaOddPrime[++liBlock];

                    //// Move to first bit of next block skipping full blocks.
                    while (lbaCurrent == 0)
                    {
                        lpN += ((PrimeType)cbBitsPerBlock) << 1;
                        if (lpN <= mpLimit)
                            lbaCurrent = mbaOddPrime[++liBlock];
                        else
                            break;
                    }
                }
            }
        }

        #endregion

        #region IEnumerable<PrimeType> Implementation

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns></returns>
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        #endregion
    }
}

它的速度接近于我的最优化埃拉托色尼筛法版本,但仍然慢了约20%,可以在这里找到:

https://dev59.com/ClPTa4cB1Zd3GeqPjGxM#9700790


将大数组分块处理,就像您为SoE的多线程版本所做的那样,可能会使其比您实现的SoE等效版本运行得更快,因为它将减少内存访问的缓存抖动。然而,如果使用轮筛法消除了相当高的因子,则对您的SoE应用轮筛法,那么在我们愿意等待的任何数字范围内(即少于几天),SoE将再次超过SoA,因为SoE的复合剔除数将少于此SoA的切换数。 - GordonBGood
Berstein和Atkin的SoA参考实现与等效的SoE实现相比,仅使用2、3、5车轮分解来进行SoE实现,因为这相当于SoA的本地车轮分解,但是对于SoE,像2、3、5、7、11、13这样更大的因子也是可能的,而SoA不响应进一步的车轮分解。通过这种方式,SoE消除合成数的数量可以减少到SoA切换次数的约三分之二,从而很可能使SoE略微领先于更进一步优化的SoA,尽管存在额外的复杂性。 - GordonBGood
应该问的问题是:“在两者都被最大程度优化时,为什么要使用Atkin筛而不是Eratosthenes筛?”答案可能是“根本没有理由”,正如我在这个回答中所阐述的那样。这并不是你的SoA代码比这里慢20%的原因,更有可能是该算法仍未完全消除其中一个二次情况的模数需求,也不是你的SoE实现被最大程度地优化,正如我在多线程回答中所阐述的那样。 - GordonBGood

0
这是我的代码,它使用了一个名为CompartmentalisedParallel的类,可以让你执行并行循环,但控制线程数量,以便将索引分组。然而,由于线程问题,你需要每次修改BitArray时要么锁定它,要么为每个线程创建一个单独的BitArray,然后在最后将它们进行异或运算,第一种选项因为锁的数量而非常慢,第二种选项对我来说似乎更快!
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace PrimeGenerator
{
    public class Atkin : Primes
    {
        protected BitArray mbaPrimes;
        protected bool mbThreaded = true;

        public Atkin(int limit)
            : this(limit, true)
        {
        }

        public Atkin(int limit, bool threaded)
            : base(limit)
        {
            mbThreaded = threaded;
            if (mbaPrimes == null) FindPrimes();
        }

        public bool Threaded
        {
            get
            {
                return mbThreaded;
            }
        }

        public override IEnumerator<int> GetEnumerator()
        {
            yield return 2;
            yield return 3;
            for (int lsN = 5; lsN <= msLimit; lsN += 2)
                if (mbaPrimes[lsN]) yield return lsN;
        }

        private void FindPrimes()
        {
            mbaPrimes = new BitArray(msLimit + 1, false);

            int lsSQRT = (int)Math.Sqrt(msLimit);

            int[] lsSquares = new int[lsSQRT + 1];
            for (int lsN = 0; lsN <= lsSQRT; lsN++)
                lsSquares[lsN] = lsN * lsN;

            if (Threaded)
            {
                CompartmentalisedParallel.For<BitArray>(
                    1, lsSQRT + 1, new ParallelOptions(),
                    (start, finish) => { return new BitArray(msLimit + 1, false); },
                    (lsX, lsState, lbaLocal) =>
                    {
                        int lsX2 = lsSquares[lsX];

                        for (int lsY = 1; lsY <= lsSQRT; lsY++)
                        {
                            int lsY2 = lsSquares[lsY];

                            int lsN = 4 * lsX2 + lsY2;
                            if (lsN <= msLimit && (lsN % 12 == 1 || lsN % 12 == 5))
                                lbaLocal[lsN] ^= true;

                            lsN -= lsX2;
                            if (lsN <= msLimit && lsN % 12 == 7)
                                lbaLocal[lsN] ^= true;

                            if (lsX > lsY)
                            {
                                lsN -= lsY2 * 2;
                                if (lsN <= msLimit && lsN % 12 == 11)
                                    lbaLocal[lsN] ^= true;
                            }
                        }

                        return lbaLocal;
                    },
                    (lbaResult, start, finish) =>
                    {
                        lock (mbaPrimes) 
                            mbaPrimes.Xor(lbaResult);
                    },
                    -1
                );
            }
            else
            {
                for (int lsX = 1; lsX <= lsSQRT; lsX++)
                {
                    int lsX2 = lsSquares[lsX];

                    for (int lsY = 1; lsY <= lsSQRT; lsY++)
                    {
                        int lsY2 = lsSquares[lsY];

                        int lsN = 4 * lsX2 + lsY2;
                        if (lsN <= msLimit && (lsN % 12 == 1 || lsN % 12 == 5))
                            mbaPrimes[lsN] ^= true;

                        lsN -= lsX2;
                        if (lsN <= msLimit && lsN % 12 == 7)
                            mbaPrimes[lsN] ^= true;

                        if (lsX > lsY)
                        {
                            lsN -= lsY2 * 2;
                            if (lsN <= msLimit && lsN % 12 == 11)
                                mbaPrimes[lsN] ^= true;
                        }
                    }
                }
            }

            for (int lsN = 5; lsN < lsSQRT; lsN += 2)
                if (mbaPrimes[lsN])
                {
                    var lsS = lsSquares[lsN];
                    for (int lsK = lsS; lsK <= msLimit; lsK += lsS)
                        mbaPrimes[lsK] = false;
                }
        }
    }
}

以及 CompartmentalisedParallel 类:

using System;
using System.Threading.Tasks;

namespace PrimeGenerator
{
    public static class CompartmentalisedParallel
    {
        #region Int

        private static int[] CalculateCompartments(int startInclusive, int endExclusive, ref int threads)
        {
            if (threads == 0) threads = 1;
            if (threads == -1) threads = Environment.ProcessorCount;
            if (threads > endExclusive - startInclusive) threads = endExclusive - startInclusive;

            int[] liThreadIndexes = new int[threads + 1];
            liThreadIndexes[threads] = endExclusive - 1;
            int liIndexesPerThread = (endExclusive - startInclusive) / threads;
            for (int liCount = 0; liCount < threads; liCount++)
                liThreadIndexes[liCount] = liCount * liIndexesPerThread;

            return liThreadIndexes;
        }

        public static void For<TLocal>(
            int startInclusive, int endExclusive,
            ParallelOptions parallelOptions,
            Func<int, int, TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal, int, int> localFinally,
            int threads)
        {
            int[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread, lsState) =>
                    {
                        TLocal llLocal = localInit(liThreadIndexes[liThread], liThreadIndexes[liThread + 1]);

                        for (int liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter, lsState, llLocal);

                        localFinally(llLocal, liThreadIndexes[liThread], liThreadIndexes[liThread + 1]);
                    }
                );
            else
            {
                TLocal llLocal = localInit(startInclusive, endExclusive);
                for (int liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter, null, llLocal);
                localFinally(llLocal, startInclusive, endExclusive);
            }
        }

        public static void For(
            int startInclusive, int endExclusive,
            ParallelOptions parallelOptions,
            Action<int, ParallelLoopState> body,
            int threads)
        {
            int[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread, lsState) =>
                    {
                        for (int liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter, lsState);
                    }
                );
            else
                for (int liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter, null);
        }

        public static void For(
            int startInclusive, int endExclusive,
            ParallelOptions parallelOptions,
            Action<int> body,
            int threads)
        {
            int[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread) =>
                    {
                        for (int liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter);
                    }
                );
            else
                for (int liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter);
        }

        public static void For(
            int startInclusive, int endExclusive,
            Action<int, ParallelLoopState> body,
            int threads)
        {
            For(startInclusive, endExclusive, new ParallelOptions(), body, threads);
        }

        public static void For(
            int startInclusive, int endExclusive,
            Action<int> body,
            int threads)
        {
            For(startInclusive, endExclusive, new ParallelOptions(), body, threads);
        }

        public static void For<TLocal>(
            int startInclusive, int endExclusive,
            Func<int, int, TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal, int, int> localFinally,
            int threads)
        {
            For<TLocal>(startInclusive, endExclusive, new ParallelOptions(), localInit, body, localFinally, threads);
        }

        #endregion

        #region Long

        private static long[] CalculateCompartments(long startInclusive, long endExclusive, ref long threads)
        {
            if (threads == 0) threads = 1;
            if (threads == -1) threads = Environment.ProcessorCount;
            if (threads > endExclusive - startInclusive) threads = endExclusive - startInclusive;

            long[] liThreadIndexes = new long[threads + 1];
            liThreadIndexes[threads] = endExclusive - 1;
            long liIndexesPerThread = (endExclusive - startInclusive) / threads;
            for (long liCount = 0; liCount < threads; liCount++)
                liThreadIndexes[liCount] = liCount * liIndexesPerThread;

            return liThreadIndexes;
        }

        public static void For<TLocal>(
            long startInclusive, long endExclusive,
            ParallelOptions parallelOptions,
            Func<long, long, TLocal> localInit,
            Func<long, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal, long, long> localFinally,
            long threads)
        {
            long[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread, lsState) =>
                    {
                        TLocal llLocal = localInit(liThreadIndexes[liThread], liThreadIndexes[liThread + 1]);

                        for (long liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter, lsState, llLocal);

                        localFinally(llLocal, liThreadIndexes[liThread], liThreadIndexes[liThread + 1]);
                    }
                );
            else
            {
                TLocal llLocal = localInit(startInclusive, endExclusive);
                for (long liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter, null, llLocal);
                localFinally(llLocal, startInclusive, endExclusive);
            }
        }

        public static void For(
            long startInclusive, long endExclusive,
            ParallelOptions parallelOptions,
            Action<long, ParallelLoopState> body,
            long threads)
        {
            long[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread, lsState) =>
                    {
                        for (long liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter, lsState);
                    }
                );
            else
                for (long liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter, null);
        }

        public static void For(
            long startInclusive, long endExclusive,
            ParallelOptions parallelOptions,
            Action<long> body,
            long threads)
        {
            long[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, parallelOptions,
                    (liThread) =>
                    {
                        for (long liCounter = liThreadIndexes[liThread]; liCounter < liThreadIndexes[liThread + 1]; liCounter++)
                            body(liCounter);
                    }
                );
            else
                for (long liCounter = startInclusive; liCounter < endExclusive; liCounter++)
                    body(liCounter);
        }

        public static void For(
            long startInclusive, long endExclusive,
            Action<long, ParallelLoopState> body,
            long threads)
        {
            For(startInclusive, endExclusive, new ParallelOptions(), body, threads);
        }

        public static void For(
            long startInclusive, long endExclusive,
            Action<long> body,
            long threads)
        {
            For(startInclusive, endExclusive, new ParallelOptions(), body, threads);
        }

        public static void For<TLocal>(
            long startInclusive, long endExclusive,
            Func<long, long, TLocal> localInit,
            Func<long, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal, long, long> localFinally,
            long threads)
        {
            For<TLocal>(startInclusive, endExclusive, new ParallelOptions(), localInit, body, localFinally, threads);
        }

        #endregion
    }
}

质数基类:

using System.Collections;
using System.Collections.Generic;

namespace PrimeGenerator
{
    public abstract class Primes : IEnumerable<int>
    {
        protected readonly int msLimit;

        public Primes(int limit)
        {
            msLimit = limit;
        }

        public int Limit
        {
            get
            {
                return msLimit;
            }
        }

        public int Count
        {
            get
            {
                int liCount = 0;
                foreach (int liPrime in this)
                    liCount++;
                return liCount;
            }
        }

        public abstract IEnumerator<int> GetEnumerator();

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }
}

按照以下步骤使用:

    var lpPrimes = new Atkin(count, true);
    Console.WriteLine(lpPrimes.Count);
    Console.WriteLine(s.ElapsedMilliseconds);

然而,我发现在所有情况下,使用埃拉托色尼筛法比阿特金筛法更快,即使在四核CPU以多线程模式运行时也是如此:

using System;
using System.Collections;
using System.Collections.Generic;

namespace PrimeGenerator
{
    public class Eratosthenes : Primes
    {
        protected BitArray mbaOddEliminated;

        public Eratosthenes(int limit)
            : base(limit)
        {
            if (mbaOddEliminated == null) FindPrimes();
        }

        public override IEnumerator<int> GetEnumerator()
        {
            yield return 2;
            for (int lsN = 3; lsN <= msLimit; lsN+=2)
                if (!mbaOddEliminated[lsN>>1]) yield return lsN;
        }

        private void FindPrimes()
        {
            mbaOddEliminated = new BitArray((msLimit>>1) + 1);
            int lsSQRT = (int)Math.Sqrt(msLimit);
            for (int lsN = 3; lsN < lsSQRT + 1; lsN += 2)
                if (!mbaOddEliminated[lsN>>1])
                    for (int lsM = lsN*lsN; lsM <= msLimit; lsM += lsN<<1)
                        mbaOddEliminated[lsM>>1] = true;
        }
    }
}

如果您能让Atkin运行得更快,请告诉我!

你可以通过以下几种方式使Atkin筛(SoA)运行更快:1)避免所有需要(昂贵的)模操作,通过识别每个二次序列“4x^2+y^2”、“3x^2+y^2”和“3*x^2-y^2”都遵循模15模式,以便只为每个模式生成适当的模数,从而加快速度超过2倍;2)你可以对筛选数组进行分段,以避免并发问题,因为每个线程都有一个(最好是基于位的)数组。当然,SoE也可以被分段,并应用轮因子分解以获得额外的收益。 - GordonBGood
最终,只有在将SoE限制为与SoA基于相同的2、3和5因子消除的轮式分解时,SoA才能更快地运行;对于我们可能想要等待的任何质数范围,最大程度优化的SoE仍然比最大程度优化的SoA更快。当使用本地编译语言(如C++)编写时,这一点更加明显,因为SoE的简单操作更有利于编译器优化,以至于每个合成筛选可能只需要三个CPU时钟周期。我认为SoA不可能那么高效。 - GordonBGood
我已经编写了一个C#版本,可以在i7-2700K处理器(3.5 GHz)上大约7.5秒内枚举所有203,280,221个素数(四十亿加)。另一个答案在这里,它使用了分段、多线程和轮子筛法。超过2/3的时间用于枚举找到的素数,因此如果两者都被优化,算法(SoE/SoA)并不重要。那个答案只使用了2,3,5轮子筛法,因此优化后的SoA应该会稍微快一些,但是如果我使用2,3,5,7,11,13筛法,那么SoE将再次更快。 - GordonBGood
看起来你在翻译Python代码时的后续回答改善了我在第一条评论中提出的优化问题,但你仍然没有将分段和多线程应用于SoA... - GordonBGood

0
这是改进的埃拉托斯特尼筛法,使用自定义FixBitArrays和不安全代码以获得更快的结果。相比我的先前的埃拉托斯特尼算法,速度提高了约225%,而且该类是独立的(这不是多线程的 - 埃拉托斯特尼不兼容多线程)。在AMD Phenom II X4 965处理器上,我可以在9,261毫秒内计算出1,000,000,000限制下的质数:
using System;
using System.Collections;
using System.Collections.Generic;

namespace PrimeGenerator
{
    // The block element type for the bit array, 
    // use any unsigned value. WARNING: UInt64 is 
    // slower even on x64 architectures.
    using BitArrayType = System.UInt32;

    // This should never be any bigger than 256 bits - leave as is.
    using BitsPerBlockType = System.Byte;

    // The prime data type, this can be any unsigned value, the limit
    // of this type determines the limit of Prime value that can be
    // found. WARNING: UInt64 is slower even on x64 architectures.
    using PrimeType = System.UInt32;

    /// <summary>
    /// Calculates prime number using the Sieve of Eratosthenes method.
    /// </summary>
    /// <example>
    /// <code>
    ///     var lpPrimes = new Eratosthenes(1e7);
    ///     foreach (UInt32 luiPrime in lpPrimes)
    ///         Console.WriteLine(luiPrime);
    /// </example>
    public class Eratosthenes : IEnumerable<PrimeType>
    {
        #region Constants

        /// <summary>
        /// Constant for number of bits per block, calculated based on size of BitArrayType.
        /// </summary>
        const BitsPerBlockType cbBitsPerBlock = sizeof(BitArrayType) * 8;

        #endregion

        #region Protected Locals

        /// <summary>
        /// The limit for the maximum prime value to find.
        /// </summary>
        protected readonly PrimeType mpLimit;

        /// <summary>
        /// The current bit array where a set bit means
        /// the odd value at that location has been determined
        /// to not be prime.
        /// </summary>
        protected BitArrayType[] mbaOddNotPrime;

        #endregion

        #region Initialisation

        /// <summary>
        /// Create Sieve of Eratosthenes generator.
        /// </summary>
        /// <param name="limit">The limit for the maximum prime value to find.</param>
        public Eratosthenes(PrimeType limit)
        {
            // Check limit range
            if (limit > PrimeType.MaxValue - (PrimeType)Math.Sqrt(PrimeType.MaxValue))
                throw new ArgumentOutOfRangeException();

            mpLimit = limit;

            FindPrimes();
        }

        #endregion

        #region Private Methods

        /// <summary>
        /// Finds the prime number within range.
        /// </summary>
        private unsafe void FindPrimes()
        {
            // Allocate bit array.
            mbaOddNotPrime = new BitArrayType[(((mpLimit >> 1) + 1) / cbBitsPerBlock) + 1];

            // Cache Sqrt of limit.
            PrimeType lpSQRT = (PrimeType)Math.Sqrt(mpLimit);

            // Fix the bit array for pointer access
            fixed (BitArrayType* lpbOddNotPrime = &mbaOddNotPrime[0])
                // Scan primes up to lpSQRT
                for (PrimeType lpN = 3; lpN <= lpSQRT; lpN += 2)
                    // If the current bit value for index lpN is cleared (prime)
                    if (
                            (
                                lpbOddNotPrime[(lpN >> 1) / cbBitsPerBlock] & 
                                ((BitArrayType)1 << (BitsPerBlockType)((lpN >> 1) % cbBitsPerBlock))
                            ) == 0
                        )
                        // Leave it cleared (prime) and mark all multiples of lpN*2 from lpN*lpN as not prime
                        for (PrimeType lpM = lpN * lpN; lpM <= mpLimit; lpM += lpN << 1)
                            // Set as not prime
                            lpbOddNotPrime[(lpM >> 1) / cbBitsPerBlock] |= 
                                (BitArrayType)((BitArrayType)1 << (BitsPerBlockType)((lpM >> 1) % cbBitsPerBlock));
        }

        /// <summary>
        /// Gets a bit value by index.
        /// </summary>
        /// <param name="bits">The blocks containing the bits.</param>
        /// <param name="index">The index of the bit.</param>
        /// <returns>True if bit is set, false if cleared.</returns>
        private bool GetBitSafe(BitArrayType[] bits, PrimeType index)
        {
            return (bits[index / cbBitsPerBlock] & ((BitArrayType)1 << (BitsPerBlockType)(index % cbBitsPerBlock))) != 0;
        }

        #endregion

        #region Public Properties

        /// <summary>
        /// Get the limit for the maximum prime value to find.
        /// </summary>
        public PrimeType Limit
        {
            get
            {
                return mpLimit;
            }
        }

        /// <summary>
        /// Returns the number of primes found in the range.
        /// </summary>
        public PrimeType Count
        {
            get
            {
                PrimeType lptCount = 0;
                foreach (PrimeType liPrime in this)
                    lptCount++;
                return lptCount;
            }
        }

        /// <summary>
        /// Determines if a value in range is prime or not.
        /// </summary>
        /// <param name="test">The value to test for primality.</param>
        /// <returns>True if the value is prime, false otherwise.</returns>
        public bool this[PrimeType test]
        {
            get
            {
                if (test > mpLimit) throw new ArgumentOutOfRangeException();
                if (test <= 1) return false;
                if (test == 2) return true;
                if ((test & 1) == 0) return false;
                return !GetBitSafe(mbaOddNotPrime, test >> 1);
            }
        }

        #endregion

        #region Public Methods

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns>The enumerator of the primes.</returns>
        public IEnumerator<PrimeType> GetEnumerator()
        {
            // Two always prime.
            yield return 2;

            // Start at first block, second MSB.
            int liBlock = 0;
            byte lbBit = 1;
            BitArrayType lbaCurrent = mbaOddNotPrime[0] >> 1;

            // For each value in range stepping in incrments of two for odd values.
            for (PrimeType lpN = 3; lpN <= mpLimit; lpN += 2)
            {
                // If current bit not set then value is prime.
                if ((lbaCurrent & 1) == 0)
                    yield return lpN;

                // Move to NSB.
                lbaCurrent >>= 1;

                // Increment bit value.
                lbBit++;

                // If block is finished.
                if (lbBit == cbBitsPerBlock) 
                {
                    // Move to first bit of next block.
                    lbBit = 0;
                    liBlock++;
                    lbaCurrent = mbaOddNotPrime[liBlock];
                }
            }
        }

        #endregion

        #region IEnumerable<PrimeType> Implementation

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns>The enumerator for the prime numbers.</returns>
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        #endregion
    }
}

在 10 亿个数中找到的质数:50,847,534,用时 9,261 毫秒


非常快,但“Eratosthenes不兼容多线程”是不正确的;如果采用正确的算法方法,它就是兼容的:将大数组分段为子部分以消除每个段,每个段应该是处理器缓存的大小,以获得更好的内存访问效率,然后使用与处理器数量相等的线程来处理每个连续的段页面,其中一个额外的段用于前景计数器/枚举器进行处理。您的AMD X4 CPU运行时间应该除以4,除了计算/枚举质数的时间,因此1亿需要约2.5秒。 - GordonBGood

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接