如何使用多线程C#实现埃拉托色尼筛法?

5

我正在尝试使用多线程实现埃拉托色尼筛法。这是我的实现:

using System;
using System.Collections.Generic;
using System.Threading;

namespace Sieve_Of_Eratosthenes 
{
    class Controller 
        {
        public static int upperLimit = 1000000000;
        public static bool[] primeArray = new bool[upperLimit];

        static void Main(string[] args) 
        {
        DateTime startTime = DateTime.Now;

        Initialize initial1 = new Initialize(0, 249999999);
        Initialize initial2 = new Initialize(250000000, 499999999);
        Initialize initial3 = new Initialize(500000000, 749999999);
        Initialize initial4 = new Initialize(750000000, 999999999);

        initial1.thread.Join();
        initial2.thread.Join();
        initial3.thread.Join();
        initial4.thread.Join();

        int sqrtLimit = (int)Math.Sqrt(upperLimit);

        Sieve sieve1 = new Sieve(249999999);
        Sieve sieve2 = new Sieve(499999999);
        Sieve sieve3 = new Sieve(749999999);
        Sieve sieve4 = new Sieve(999999999);

        for (int i = 3; i < sqrtLimit; i += 2) 
            {
            if (primeArray[i] == true) 
                {
                int squareI = i * i;

                    if (squareI <= 249999999) 
                    {
                sieve1.set(i);
                sieve2.set(i);
                sieve3.set(i);
                sieve4.set(i);
                sieve1.thread.Join();
                sieve2.thread.Join();
                sieve3.thread.Join();
                sieve4.thread.Join();
            } 
                    else if (squareI > 249999999 & squareI <= 499999999) 
                    {
                sieve2.set(i);
                sieve3.set(i);
                sieve4.set(i);
                sieve2.thread.Join();
                sieve3.thread.Join();
                sieve4.thread.Join();
            } 
                    else if (squareI > 499999999 & squareI <= 749999999) 
                    {
                sieve3.set(i);
                sieve4.set(i);
                sieve3.thread.Join();
                sieve4.thread.Join();
            } 
                    else if (squareI > 749999999 & squareI <= 999999999) 
                    {
                sieve4.set(i);
                sieve4.thread.Join();
            }
            }
        }    

        int count = 0;
        primeArray[2] = true;
        for (int i = 2; i < upperLimit; i++) 
            {
            if (primeArray[i]) 
                {
                count++;
            }
        }

        Console.WriteLine("Total: " + count);

        DateTime endTime = DateTime.Now;
        TimeSpan elapsedTime = endTime - startTime;
        Console.WriteLine("Elapsed time: " + elapsedTime.Seconds);
        }

        public class Initialize 
        {
            public Thread thread;
        private int lowerLimit;
        private int upperLimit;

        public Initialize(int lowerLimit, int upperLimit) 
            {
            this.lowerLimit = lowerLimit;
            this.upperLimit = upperLimit;
            thread = new Thread(this.InitializeArray);
            thread.Priority = ThreadPriority.Highest;
            thread.Start();
        }

        private void InitializeArray() 
            {
            for (int i = this.lowerLimit; i <= this.upperLimit; i++) 
                {
                if (i % 2 == 0) 
                    {
                    Controller.primeArray[i] = false;
            } 
                    else 
                    {
                Controller.primeArray[i] = true;
            }
            }
        }
        }

        public class Sieve 
            {
            public Thread thread;
            public int i;
            private int upperLimit;

            public Sieve(int upperLimit) 
                {
                this.upperLimit = upperLimit;
            }

        public void set(int i) 
            {
            this.i = i;
            thread = new Thread(this.primeGen);
            thread.Start();
        }

        public void primeGen() 
            {
            for (int j = this.i * this.i; j <= this.upperLimit; j += i) 
                {
                Controller.primeArray[j] = false;
            }
        }
        }
    }
}

这个需要30秒才能生成输出,有没有什么方法可以加速呢?
编辑: 这是TPL的实现:
public LinkedList<int> GetPrimeList(int limit) {
        LinkedList<int> primeList = new LinkedList<int>();
        bool[] primeArray = new bool[limit];

        Console.WriteLine("Initialization started...");

        Parallel.For(0, limit, i => {
            if (i % 2 == 0) {
                primeArray[i] = false;
            } else {
                primeArray[i] = true;
            }
        }
        );
        Console.WriteLine("Initialization finished...");

        /*for (int i = 0; i < limit; i++) {
            if (i % 2 == 0) {
                primeArray[i] = false;
            } else {
                primeArray[i] = true;
            }
        }*/

        int sqrtLimit = (int)Math.Sqrt(limit);
        Console.WriteLine("Operation started...");
        Parallel.For(3, sqrtLimit, i => {
            lock (this) {
                if (primeArray[i]) {
                    for (int j = i * i; j < limit; j += i) {
                        primeArray[j] = false;
                    }

                }
            }
        }
        );
        Console.WriteLine("Operation finished...");
        /*for (int i = 3; i < sqrtLimit; i += 2) {
            if (primeArray[i]) {
                for (int j = i * i; j < limit; j += i) {
                    primeArray[j] = false;
                }
            }
        }*/

        //primeList.AddLast(2);
        int count = 1;
        Console.WriteLine("Counting started...");
        Parallel.For(3, limit, i => {
            lock (this) {
                if (primeArray[i]) {
                    //primeList.AddLast(i);
                    count++;
                }
            }
        }
        );
        Console.WriteLine("Counting finished...");
        Console.WriteLine(count);

        /*for (int i = 3; i < limit; i++) {
            if (primeArray[i]) {
                primeList.AddLast(i);
            }
        }*/

        return primeList;
    }

感谢您。

10
你有什么想法吗?你已经尝试过什么了吗?发布一堆代码并要求我们“修复”它很少会得到好的答案。我认为你会发现,你在问题上投入的工作量与人们在回答上投入的工作量成正比。 - Cody Gray
代码没有问题,所以我不需要任何“修复”,我只是想知道是否可能加快速度。从我的代码中可以清楚地看出我尝试了什么,不是吗? - Tapas Bose
5
你使用过分析工具吗?如果有,分析工具找到了哪些热点问题?你尝试过单线程运行吗?单线程版本是更快还是更慢?你尝试改变线程数量了吗?如果使用更多线程更快,那么缩放因子是多少?你有多少处理器?在此问题上是否有意义创建超过一个线程来工作?为什么?你尝试过任务并行库吗?使用它的结果如何? - Eric Lippert
我也尝试了单线程,执行时间为45秒。TPL花费了56秒。我的处理器是E7500 Core2Duo @ 2.93GHz。 - Tapas Bose
3个回答

20

编辑:

我的回答是:是的,您绝对可以使用任务并行库(TPL)更快地找到一亿个质数。问题中给出的代码之所以慢,是因为它没有有效地利用内存或多处理,并且最终输出也不高效。

因此,除了仅进行多处理之外,还有许多事情可以做来加速埃拉托斯特尼筛法,具体如下:

您筛选所有数字,包括偶数和奇数,这会使用更多的内存(对于10亿的范围需要10亿字节)并且速度较慢,因为进行了不必要的处理。只需利用2是唯一的偶素数这一事实,使数组仅表示奇素数即可减少一半的内存需求,并将合成数筛选操作的数量减少超过两倍,因此该操作在您的计算机上可能需要约20秒来生成10亿以内的素数。
筛选如此巨大的内存数组的合成数之一部分原因是它远远超过了CPU缓存大小,因此许多内存访问以某种随机方式到达主内存,这意味着筛选给定的合成数表示可以需要超过100个CPU时钟周期,而如果它们全部在L1缓存中,则只需要一个周期,在L2缓存中仅需要约4个周期; 并非所有访问都需要最坏情况的时间,但这肯定会减慢处理速度。使用位压缩数组来表示素数候选者将使内存使用减少8倍,并使最坏情况下的访问不太常见。虽然访问单个位的计算开销会有所增加,但您会发现平均内存访问时间的时间节省将大于此成本。实现这一简单方法是使用BitArray而不是bool数组。编写自己的位访问使用shift和“and”操作将比使用BitArray类更有效。您会发现使用BitArray会稍微节省一些时间,并且使用自己的位操作可以使单线程性能减少约10到12秒。
您找到的素数计数输出效率不高,因为每个候选素数都需要一个数组访问和一个if条件。一旦您将筛选缓冲区作为位打包的单词数组,就可以使用计数查找表(LUT)更有效地完成此操作,该表消除了if条件,并且每个位打包的单词只需要两个数组访问。进行这样做,计数时间成为工作中可以忽略不计的部分,与筛选合成数的时间相比,进一步节省可达到对于10亿以内的素数计数大约8秒钟。
进一步减少处理的素数候选者可以是应用轮子分解的结果,该分解从处理中删除2、3和5的素数因子,并通过调整位打包方法还可以将给定大小的位缓冲区的有效范围增加约两倍。这可以将合成数筛选操作的数量减少多达三倍以上,尽管代价是进一步的计算复杂性。
为了进一步减少内存使用,使内存访问更加高效,并为多处理准备道路,可以将工作分成不大于L1或L2缓存大小的页面。这要求保留一个基本素数表,该表包含所有最大素数候选的平方根,并重新计算在给定页面段上跨越的每个基本素数的起始地址参数,但这仍然比使用巨大的筛选数组更有效。实施此页面分段的附加好处是,您不必预先指定上限筛选界限,而是只需要根据需要扩展基本素数以处理更高的页面即可。到目前为止的所有优化,您可能可以在约2.5秒钟内生成10亿以内的素数计数。
最后,可以使用TPL或线程对页面段进行多处理,其中每个核心的缓冲区
我已经实现了一个多线程的埃拉托色尼筛法作为回复另一个线程的答案,以展示埃拉托色尼筛法没有比阿特金筛法更具优势。它使用任务并行库(TPL)中的任务和任务工厂,因此至少需要DotNet Framework 4. 我进一步调整了那段代码,使用了上面讨论过的所有优化方法,作为对同一问题的备选答案。我在这里重新发布了那段调整过的代码,并添加了注释和易于阅读的格式,如下所示:
  using System;
  using System.Collections;
  using System.Collections.Generic;
  using System.Linq;
  using System.Threading;
  using System.Threading.Tasks;

  class UltimatePrimesSoE : IEnumerable<ulong> {
    #region const and static readonly field's, private struct's and classes

    //one can get single threaded performance by setting NUMPRCSPCS = 1
    static readonly uint NUMPRCSPCS = (uint)Environment.ProcessorCount + 1;
    //the L1CACHEPOW can not be less than 14 and is usually the two raised to the power of the L1 or L2 cache
    const int L1CACHEPOW = 14, L1CACHESZ = (1 << L1CACHEPOW), MXPGSZ = L1CACHESZ / 2; //for buffer ushort[]
    const uint CHNKSZ = 17; //this times BWHLWRDS (below) times two should not be bigger than the L2 cache in bytes
    //the 2,3,57 factorial wheel increment pattern, (sum) 48 elements long, starting at prime 19 position
    static readonly byte[] WHLPTRN = { 2,3,1,3,2,1,2,3,3,1,3,2,1,3,2,3,4,2,1,2,1,2,4,3,
                                       2,3,1,2,3,1,3,3,2,1,2,3,1,3,2,1,2,1,5,1,5,1,2,1 }; const uint FSTCP = 11;
    static readonly byte[] WHLPOS; static readonly byte[] WHLNDX; //look up wheel position from index and vice versa
    static readonly byte[] WHLRNDUP; //to look up wheel rounded up index positon values, allow for overflow in size
    static readonly uint WCRC = WHLPTRN.Aggregate(0u, (acc, n) => acc + n); //small wheel circumference for odd numbers
    static readonly uint WHTS = (uint)WHLPTRN.Length; static readonly uint WPC = WHTS >> 4; //number of wheel candidates
    static readonly byte[] BWHLPRMS = { 2,3,5,7,11,13,17 }; const uint FSTBP = 19; //big wheel primes, following prime
    //the big wheel circumference expressed in number of 16 bit words as in a minimum bit buffer size
    static readonly uint BWHLWRDS = BWHLPRMS.Aggregate(1u, (acc, p) => acc * p) / 2 / WCRC * WHTS / 16;
    //page size and range as developed from the above
    static readonly uint PGSZ = MXPGSZ / BWHLWRDS * BWHLWRDS; static readonly uint PGRNG = PGSZ * 16 / WHTS * WCRC;
    //buffer size (multiple chunks) as produced from the above
    static readonly uint BFSZ = CHNKSZ * PGSZ, BFRNG = CHNKSZ * PGRNG; //number of uints even number of caches in chunk
    static readonly ushort[] MCPY; //a Master Copy page used to hold the lower base primes preculled version of the page
    struct Wst { public ushort msk; public byte mlt; public byte xtr; public ushort nxt; }
    static readonly byte[] PRLUT; /*Wheel Index Look Up Table */ static readonly Wst[] WSLUT; //Wheel State Look Up Table
    static readonly byte[] CLUT; // a Counting Look Up Table for very fast counting of primes

    class Bpa { //very efficient auto-resizing thread-safe read-only indexer class to hold the base primes array
      byte[] sa = new byte[0]; uint lwi = 0, lpd = 0; object lck = new object();
      public uint this[uint i] {
        get {
          if (i >= this.sa.Length) lock (this.lck) {
              var lngth = this.sa.Length; while (i >= lngth) {
                var bf = (ushort[])MCPY.Clone(); if (lngth == 0) {
                  for (uint bi = 0, wi = 0, w = 0, msk = 0x8000, v = 0; w < bf.Length;
                      bi += WHLPTRN[wi++], wi = (wi >= WHTS) ? 0 : wi) {
                    if (msk >= 0x8000) { msk = 1; v = bf[w++]; } else msk <<= 1;
                    if ((v & msk) == 0) {
                      var p = FSTBP + (bi + bi); var k = (p * p - FSTBP) >> 1;
                      if (k >= PGRNG) break; var pd = p / WCRC; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
                      for (uint wrd = kd * WPC + (uint)(kn >> 4), ndx = wi * WHTS + kn; wrd < bf.Length; ) {
                        var st = WSLUT[ndx]; bf[wrd] |= st.msk; wrd += st.mlt * pd + st.xtr; ndx = st.nxt;
                      }
                    }
                  }
                }
                else { this.lwi += PGRNG; cullbf(this.lwi, bf); }
                var c = count(PGRNG, bf); var na = new byte[lngth + c]; sa.CopyTo(na, 0);
                for (uint p = FSTBP + (this.lwi << 1), wi = 0, w = 0, msk = 0x8000, v = 0;
                    lngth < na.Length; p += (uint)(WHLPTRN[wi++] << 1), wi = (wi >= WHTS) ? 0 : wi) {
                  if (msk >= 0x8000) { msk = 1; v = bf[w++]; } else msk <<= 1; if ((v & msk) == 0) {
                    var pd = p / WCRC; na[lngth++] = (byte)(((pd - this.lpd) << 6) + wi); this.lpd = pd;
                  }
                } this.sa = na;
              }
            } return this.sa[i];
        }
      }
    }
    static readonly Bpa baseprms = new Bpa(); //the base primes array using the above class

    struct PrcsSpc { public Task tsk; public ushort[] buf; } //used for multi-threading buffer array processing

    #endregion

    #region private static methods

    static int count(uint bitlim, ushort[] buf) { //very fast counting method using the CLUT look up table
      if (bitlim < BFRNG) {
        var addr = (bitlim - 1) / WCRC; var bit = WHLNDX[bitlim - addr * WCRC] - 1; addr *= WPC;
        for (var i = 0; i < 3; ++i) buf[addr++] |= (ushort)((unchecked((ulong)-2) << bit) >> (i << 4));
      }
      var acc = 0; for (uint i = 0, w = 0; i < bitlim; i += WCRC)
        acc += CLUT[buf[w++]] + CLUT[buf[w++]] + CLUT[buf[w++]]; return acc;
    }

    static void cullbf(ulong lwi, ushort[] b) { //fast buffer segment culling method using a Wheel State Look Up Table
      ulong nlwi = lwi;
      for (var i = 0u; i < b.Length; nlwi += PGRNG, i += PGSZ) MCPY.CopyTo(b, i); //copy preculled lower base primes.
      for (uint i = 0, pd = 0; ; ++i) {
        pd += (uint)baseprms[i] >> 6;
        var wi = baseprms[i] & 0x3Fu; var wp = (uint)WHLPOS[wi]; var p = pd * WCRC + PRLUT[wi];
        var k = ((ulong)p * (ulong)p - FSTBP) >> 1;
        if (k >= nlwi) break; if (k < lwi) {
          k = (lwi - k) % (WCRC * p);
          if (k != 0) {
            var nwp = wp + (uint)((k + p - 1) / p); k = (WHLRNDUP[nwp] - wp) * p - k;
          }
        }
        else k -= lwi; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
        for (uint wrd = (uint)kd * WPC + (uint)(kn >> 4), ndx = wi * WHTS + kn; wrd < b.Length; ) {
          var st = WSLUT[ndx]; b[wrd] |= st.msk; wrd += st.mlt * pd + st.xtr; ndx = st.nxt;
        }
      }
    }

    static Task cullbftsk(ulong lwi, ushort[] b, Action<ushort[]> f) { // forms a task of the cull buffer operaion
      return Task.Factory.StartNew(() => { cullbf(lwi, b); f(b); });
    }

    //iterates the action over each page up to the page including the top_number,
    //making an adjustment to the top limit for the last page.
    //this method works for non-dependent actions that can be executed in any order.
    static void IterateTo(ulong top_number, Action<ulong, uint, ushort[]> actn) {
      PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS]; for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s] = new PrcsSpc {
        buf = new ushort[BFSZ],
        tsk = Task.Factory.StartNew(() => { })
      };
      var topndx = (top_number - FSTBP) >> 1; for (ulong ndx = 0; ndx <= topndx; ) {
        ps[0].tsk.Wait(); var buf = ps[0].buf; for (var s = 0u; s < NUMPRCSPCS - 1; ++s) ps[s] = ps[s + 1];
        var lowi = ndx; var nxtndx = ndx + BFRNG; var lim = topndx < nxtndx ? (uint)(topndx - ndx + 1) : BFRNG;
        ps[NUMPRCSPCS - 1] = new PrcsSpc { buf = buf, tsk = cullbftsk(ndx, buf, (b) => actn(lowi, lim, b)) };
        ndx = nxtndx;
      } for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s].tsk.Wait();
    }

    //iterates the predicate over each page up to the page where the predicate paramenter returns true,
    //this method works for dependent operations that need to be executed in increasing order.
    //it is somewhat slower than the above as the predicate function is executed outside the task.
    static void IterateUntil(Func<ulong, ushort[], bool> prdct) {
      PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS];
      for (var s = 0u; s < NUMPRCSPCS; ++s) {
        var buf = new ushort[BFSZ];
        ps[s] = new PrcsSpc { buf = buf, tsk = cullbftsk(s * BFRNG, buf, (bfr) => { }) };
      }
      for (var ndx = 0UL; ; ndx += BFRNG) {
        ps[0].tsk.Wait(); var buf = ps[0].buf; var lowi = ndx; if (prdct(lowi, buf)) break;
        for (var s = 0u; s < NUMPRCSPCS - 1; ++s) ps[s] = ps[s + 1];
        ps[NUMPRCSPCS - 1] = new PrcsSpc {
          buf = buf,
          tsk = cullbftsk(ndx + NUMPRCSPCS * BFRNG, buf, (bfr) => { })
        };
      }
    }

    #endregion

    #region initialization

    /// <summary>
    /// the static constructor is used to initialize the static readonly arrays.
    /// </summary>
    static UltimatePrimesSoE() {
      WHLPOS = new byte[WHLPTRN.Length + 1]; //to look up wheel position index from wheel index
      for (byte i = 0, acc = 0; i < WHLPTRN.Length; ++i) { acc += WHLPTRN[i]; WHLPOS[i + 1] = acc; }
      WHLNDX = new byte[WCRC + 1]; for (byte i = 1; i < WHLPOS.Length; ++i) {
        for (byte j = (byte)(WHLPOS[i - 1] + 1); j <= WHLPOS[i]; ++j) WHLNDX[j] = i;
      }
      WHLRNDUP = new byte[WCRC * 2]; for (byte i = 1; i < WHLRNDUP.Length; ++i) {
        if (i > WCRC) WHLRNDUP[i] = (byte)(WCRC + WHLPOS[WHLNDX[i - WCRC]]); else WHLRNDUP[i] = WHLPOS[WHLNDX[i]];
      }
      Func<ushort, int> nmbts = (v) => { var acc = 0; while (v != 0) { acc += (int)v & 1; v >>= 1; } return acc; };
      CLUT = new byte[1 << 16]; for (var i = 0; i < CLUT.Length; ++i) CLUT[i] = (byte)nmbts((ushort)(i ^ -1));
      PRLUT = new byte[WHTS]; for (var i = 0; i < PRLUT.Length; ++i) {
        var t = (uint)(WHLPOS[i] * 2) + FSTBP; if (t >= WCRC) t -= WCRC; if (t >= WCRC) t -= WCRC; PRLUT[i] = (byte)t;
      }
      WSLUT = new Wst[WHTS * WHTS]; for (var x = 0u; x < WHTS; ++x) {
        var p = FSTBP + 2u * WHLPOS[x]; var pr = p % WCRC;
        for (uint y = 0, pos = (p * p - FSTBP) / 2; y < WHTS; ++y) {
          var m = WHLPTRN[(x + y) % WHTS];
          pos %= WCRC; var posn = WHLNDX[pos]; pos += m * pr; var nposd = pos / WCRC; var nposn = WHLNDX[pos - nposd * WCRC];
          WSLUT[x * WHTS + posn] = new Wst {
            msk = (ushort)(1 << (int)(posn & 0xF)),
            mlt = (byte)(m * WPC),
            xtr = (byte)(WPC * nposd + (nposn >> 4) - (posn >> 4)),
            nxt = (ushort)(WHTS * x + nposn)
          };
        }
      }
      MCPY = new ushort[PGSZ]; foreach (var lp in BWHLPRMS.SkipWhile(p => p < FSTCP)) {
        var p = (uint)lp;
        var k = (p * p - FSTBP) >> 1; var pd = p / WCRC; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
        for (uint w = kd * WPC + (uint)(kn >> 4), ndx = WHLNDX[(2 * WCRC + p - FSTBP) / 2] * WHTS + kn; w < MCPY.Length; ) {
          var st = WSLUT[ndx]; MCPY[w] |= st.msk; w += st.mlt * pd + st.xtr; ndx = st.nxt;
        }
      }
    }

    #endregion

    #region public class

    // this class implements the enumeration (IEnumerator).
    //    it works by farming out tasks culling pages, which it then processes in order by
    //    enumerating the found primes as recognized by the remaining non-composite bits
    //    in the cull page buffers.
    class nmrtr : IEnumerator<ulong>, IEnumerator, IDisposable {
      PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS]; ushort[] buf;
      public nmrtr() {
        for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s] = new PrcsSpc { buf = new ushort[BFSZ] };
        for (var s = 1u; s < NUMPRCSPCS; ++s) {
          ps[s].tsk = cullbftsk((s - 1u) * BFRNG, ps[s].buf, (bfr) => { });
        } buf = ps[0].buf;
      }
      ulong _curr, i = (ulong)-WHLPTRN[WHTS - 1]; int b = -BWHLPRMS.Length - 1; uint wi = WHTS - 1; ushort v, msk = 0;
      public ulong Current { get { return this._curr; } } object IEnumerator.Current { get { return this._curr; } }
      public bool MoveNext() {
        if (b < 0) {
          if (b == -1) b += buf.Length; //no yield!!! so automatically comes around again
          else { this._curr = (ulong)BWHLPRMS[BWHLPRMS.Length + (++b)]; return true; }
        }
        do {
          i += WHLPTRN[wi++]; if (wi >= WHTS) wi = 0; if ((this.msk <<= 1) == 0) {
            if (++b >= BFSZ) {
              b = 0; for (var prc = 0; prc < NUMPRCSPCS - 1; ++prc) ps[prc] = ps[prc + 1];
              ps[NUMPRCSPCS - 1u].buf = buf;
              ps[NUMPRCSPCS - 1u].tsk = cullbftsk(i + (NUMPRCSPCS - 1u) * BFRNG, buf, (bfr) => { });
              ps[0].tsk.Wait(); buf = ps[0].buf;
            } v = buf[b]; this.msk = 1;
          }
        }
        while ((v & msk) != 0u); _curr = FSTBP + i + i; return true;
      }
      public void Reset() { throw new Exception("Primes enumeration reset not implemented!!!"); }
      public void Dispose() { }
    }

    #endregion

    #region public instance method and associated sub private method

    /// <summary>
    /// Gets the enumerator for the primes.
    /// </summary>
    /// <returns>The enumerator of the primes.</returns>
    public IEnumerator<ulong> GetEnumerator() { return new nmrtr(); }

    /// <summary>
    /// Gets the enumerator for the primes.
    /// </summary>
    /// <returns>The enumerator of the primes.</returns>
    IEnumerator IEnumerable.GetEnumerator() { return new nmrtr(); }

    #endregion

    #region public static methods

    /// <summary>
    /// Gets the count of primes up the number, inclusively.
    /// </summary>
    /// <param name="top_number">The ulong top number to check for prime.</param>
    /// <returns>The long number of primes found.</returns>
    public static long CountTo(ulong top_number) {
      if (top_number < FSTBP) return BWHLPRMS.TakeWhile(p => p <= top_number).Count();
      var cnt = (long)BWHLPRMS.Length;
      IterateTo(top_number, (lowi, lim, b) => { Interlocked.Add(ref cnt, count(lim, b)); }); return cnt;
    }

    /// <summary>
    /// Gets the sum of the primes up the number, inclusively.
    /// </summary>
    /// <param name="top_number">The uint top number to check for prime.</param>
    /// <returns>The ulong sum of all the primes found.</returns>
    public static ulong SumTo(uint top_number) {
      if (top_number < FSTBP) return (ulong)BWHLPRMS.TakeWhile(p => p <= top_number).Aggregate(0u, (acc, p) => acc += p);
      var sum = (long)BWHLPRMS.Aggregate(0u, (acc, p) => acc += p);
      Func<ulong, uint, ushort[], long> sumbf = (lowi, bitlim, buf) => {
        var acc = 0L; for (uint i = 0, wi = 0, msk = 0x8000, w = 0, v = 0; i < bitlim;
            i += WHLPTRN[wi++], wi = wi >= WHTS ? 0 : wi) {
          if (msk >= 0x8000) { msk = 1; v = buf[w++]; } else msk <<= 1;
          if ((v & msk) == 0) acc += (long)(FSTBP + ((lowi + i) << 1));
        } return acc;
      };
      IterateTo(top_number, (pos, lim, b) => { Interlocked.Add(ref sum, sumbf(pos, lim, b)); }); return (ulong)sum;
    }

    /// <summary>
    /// Gets the prime number at the zero based index number given.
    /// </summary>
    /// <param name="index">The long zero-based index number for the prime.</param>
    /// <returns>The ulong prime found at the given index.</returns>
    public static ulong ElementAt(long index) {
      if (index < BWHLPRMS.Length) return (ulong)BWHLPRMS.ElementAt((int)index);
      long cnt = BWHLPRMS.Length; var ndx = 0UL; var cycl = 0u; var bit = 0u; IterateUntil((lwi, bfr) => {
        var c = count(BFRNG, bfr); if ((cnt += c) < index) return false; ndx = lwi; cnt -= c; c = 0;
        do { var w = cycl++ * WPC; c = CLUT[bfr[w++]] + CLUT[bfr[w++]] + CLUT[bfr[w]]; cnt += c; } while (cnt < index);
        cnt -= c; var y = (--cycl) * WPC; ulong v = ((ulong)bfr[y + 2] << 32) + ((ulong)bfr[y + 1] << 16) + bfr[y];
        do { if ((v & (1UL << ((int)bit++))) == 0) ++cnt; } while (cnt <= index); --bit; return true;
      }); return FSTBP + ((ndx + cycl * WCRC + WHLPOS[bit]) << 1);
    }

    #endregion
  }

上述代码将在四核(包括HT,共八个线程)i7-2700K(3.5 GHz)上枚举出十亿以内的质数,大约需要1.55秒,而您的E7500可能会慢上四倍,因为它的线程更少,时钟速度稍低。其中约四分之三的时间仅用于运行MoveNext()方法和Current属性,因此我提供了公共静态方法"CountTo"、"SumTo"和"ElementAt"来计算范围内的质数数量或总和以及第n个从零开始的质数,而不使用枚举。在我的计算机上,使用UltimatePrimesSoE.CountTo(1000000000)静态方法大约需要0.32秒即可产生50847534,因此在Intel E7500上不应该需要超过1.28秒。
编辑添加:有趣的是,这段代码在x86 32位模式下比在x64 64位模式下运行得快30%,这可能是因为避免了将uint32数字扩展为ulong的轻微额外开销。以上所有定时均为64位模式。
这个实现有将近300行(密集)代码,它并不简单,但这是为了做出所有描述的优化以使代码更高效的代价。它的代码行数比Aaron Murgatroyd的另一个答案略多,虽然他的代码行数较少,但他的代码速度也慢了约4倍。事实上,几乎所有执行时间都花费在我代码的私有静态"cullbf"方法的最后一个“for循环”中,该循环仅有4个语句加上范围条件检查;所有其余代码都只是支持重复应用该循环。
这段代码比其他答案更快的原因,与你的代码相比,除了他只处理奇素数候选项的步骤(1)优化外,还有相同的原因。他使用多进程几乎没有效果,仅有30%的优势,而不是应该在真正的四核CPU上正确应用时可能达到的四倍因子,因为它是针对每个素数而不是所有小页面上的所有素数进行线程处理,并且他使用不安全的指针数组访问作为一种消除DotNet计算成本的方法,即每个循环的数组边界检查实际上会使代码变慢,而不是直接使用数组,包括边界检查,因为DotNet JIT编译器对指针访问产生的代码效率非常低。此外,他的代码枚举素数的方式与我的代码相同,这种枚举每枚举一个素数就需要10个CPU时钟周期的成本,在他的情况下稍微差一些,因为他使用内置的C#迭代器,这些迭代器比我的“自己编写”的IEnumerator接口效率略低。然而,为了获得最大的速度,我们应该完全避免枚举;然而,即使他提供的“Count”实例方法也使用“foreach”循环,这意味着枚举。
总之,这个答案代码在您的E7500 CPU上比您的问题代码快大约25倍(在具有更多核心/线程的CPU上快得多),使用的内存要少得多,并且不仅限于约32位数范围内的较小质数范围,但代价是增加了代码复杂性。

一篇非常好的写作。谢谢!我尝试编译代码(我发现我的系统上有csc 4.0.30319...),似乎缺少一些通用类型细节和使用语句,所以需要一些调整,对我来说没有起作用,但是没关系,这很可能是一个本地问题,因为这是我第一次尝试C#。 - mgaert
@mgaert,我已经添加了缺失的using。顺便说一句,我发现了一种方法,可以使它运行速度大约快两到四倍,因此只比primesieve慢1.5到2倍,并且比SoA primegen更快,但我还没有抽出时间来写它。新方法基于伯恩斯坦SoE参考实现,用于与SoA进行比较,但与他的“eratspeed”实现中应用的车轮分解量的限制相比未被削弱。它通过模集而不是按模数顺序筛选合数,从而减少了内部筛选循环的复杂性。 - GordonBGood
@RagingCain,它已经被分解成相当小的方法并带有注释;但是,我可以添加一些关于整个算法如何工作的注释。最难理解的部分可能是它使用位打包轮数组来存储标记的合数表示,然后“cullbf”使用Wheel State LUT数组WSLUT通过位打包缓冲区推进每个素数的筛选,使用预生成的查找表(LUT)WHLPOS、PRLUT和WHLRNDUP来计算要筛选的缓冲区中的第一个位。缓冲区用MCPY的右部分初始化;预先筛除11/13/17。 - GordonBGood
@RagingCain,根据之前的评论,通过将筛选缓冲区分成48个位打包缓冲区,每个缓冲区仅包含一个轮模数,可以将此时间缩短2到3倍(再次取决于CPU)。这既更简单,也可能更复杂:内部循环除了处理位打包外非常简单,但支持该内部循环的代码在通过质数推进模数缓冲区时避免完全重新计算每个模数的起始点(一项计算代价高昂的操作)而使用预先计算的LUT时有些复杂。 - GordonBGood
@RagingCain,你能否发布你的网站,我可能可以直接通过那个网站联系你,或许提交一些文章来解释可以从这里链接的不同算法?此外,如果我看到你当前的代码,有助于我了解你目前的专业知识,以便能够帮助你扩展知识。 - GordonBGood
显示剩余2条评论

4

我使用多线程实现 (.NET 4.0 及以上版本必须):

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace PrimeGenerator
{
    // The block element type for the bit array, 
    // use any unsigned value. WARNING: UInt64 is 
    // slower even on x64 architectures.
    using BitArrayType = System.UInt32;

    // This should never be any bigger than 256 bits - leave as is.
    using BitsPerBlockType = System.Byte;

    // The prime data type, this can be any unsigned value, the limit
    // of this type determines the limit of Prime value that can be
    // found. WARNING: UInt64 is slower even on x64 architectures.
    using PrimeType = System.UInt32;

    /// <summary>
    /// Calculates prime number using the Sieve of Eratosthenes method.
    /// </summary>
    /// <example>
    /// <code>
    ///     var lpPrimes = new Eratosthenes(1e7);
    ///     foreach (UInt32 luiPrime in lpPrimes)
    ///         Console.WriteLine(luiPrime);
    /// </example>
    public class Eratosthenes : IEnumerable<PrimeType>
    {
        #region Constants

        /// <summary>
        /// Constant for number of bits per block, calculated based on size of BitArrayType.
        /// </summary>
        const BitsPerBlockType cbBitsPerBlock = sizeof(BitArrayType) * 8;

        #endregion

        #region Protected Locals

        /// <summary>
        /// The limit for the maximum prime value to find.
        /// </summary>
        protected readonly PrimeType mpLimit;

        /// <summary>
        /// True if the class is multi-threaded
        /// </summary>
        protected readonly bool mbThreaded;

        /// <summary>
        /// The current bit array where a set bit means
        /// the odd value at that location has been determined
        /// to not be prime.
        /// </summary>
        protected BitArrayType[] mbaOddNotPrime;

        #endregion

        #region Initialisation

        /// <summary>
        /// Create Sieve of Eratosthenes generator.
        /// </summary>
        /// <param name="limit">The limit for the maximum prime value to find.</param>
        /// <param name="threaded">True if threaded, false otherwise.</param>
        public Eratosthenes(PrimeType limit, bool threaded)
        {
            // Check limit range
            if (limit > PrimeType.MaxValue - (PrimeType)Math.Sqrt(PrimeType.MaxValue))
                throw new ArgumentOutOfRangeException();

            mbThreaded = threaded;
            mpLimit = limit;

            FindPrimes();
        }

        /// <summary>
        /// Create Sieve of Eratosthenes generator in multi-threaded mode.
        /// </summary>
        /// <param name="limit">The limit for the maximum prime value to find.</param>
        public Eratosthenes(PrimeType limit)
            : this(limit, true)
        {
        }

        #endregion

        #region Private Methods

        /// <summary>
        /// Calculates compartment indexes for a multi-threaded operation.
        /// </summary>
        /// <param name="startInclusive">The inclusive starting index.</param>
        /// <param name="endExclusive">The exclusive ending index.</param>
        /// <param name="threads">The number of threads.</param>
        /// <returns>An array of thread elements plus 1 containing the starting and exclusive ending indexes to process for each thread.</returns>
        private PrimeType[] CalculateCompartments(PrimeType startInclusive, PrimeType endExclusive, ref int threads)
        {
            if (threads == 0) threads = 1;
            if (threads == -1) threads = Environment.ProcessorCount;
            if (threads > endExclusive - startInclusive) threads = (int)(endExclusive - startInclusive);

            PrimeType[] liThreadIndexes = new PrimeType[threads + 1];
            liThreadIndexes[threads] = endExclusive;
            PrimeType liIndexesPerThread = (endExclusive - startInclusive) / (PrimeType)threads;
            for (PrimeType liCount = 0; liCount < threads; liCount++)
                liThreadIndexes[liCount] = liCount * liIndexesPerThread;

            return liThreadIndexes;
        }

        /// <summary>
        /// Executes a simple for loop in parallel but only creates
        /// a set amount of threads breaking the work up evenly per thread,
        /// calling the body only once per thread, this is different
        /// to the .NET 4.0 For method which calls the body for each index.
        /// </summary>
        /// <typeparam name="ParamType">The type of parameter to pass to the body.</typeparam>
        /// <param name="startInclusive">The starting index.</param>
        /// <param name="endExclusive">The exclusive ending index.</param>
        /// <param name="parameter">The parameter to pass to the body.</param>
        /// <param name="body">The body to execute per thread.</param>
        /// <param name="threads">The number of threads to execute.</param>
        private void For<ParamType>(
            PrimeType startInclusive, PrimeType endExclusive, ParamType parameter,
            Action<PrimeType, PrimeType, ParamType> body,
            int threads)
        {
            PrimeType[] liThreadIndexes = CalculateCompartments(startInclusive, endExclusive, ref threads);

            if (threads > 1)
                Parallel.For(
                    0, threads, new System.Threading.Tasks.ParallelOptions(),
                    (liThread) => { body(liThreadIndexes[liThread], liThreadIndexes[liThread + 1], parameter); }
                );
            else
                body(startInclusive, endExclusive, parameter);
        }

        /// <summary>
        /// Finds the prime number within range.
        /// </summary>
        private unsafe void FindPrimes()
        {
            // Allocate bit array.
            mbaOddNotPrime = new BitArrayType[(((mpLimit >> 1) + 1) / cbBitsPerBlock) + 1];

            // Cache Sqrt of limit.
            PrimeType lpSQRT = (PrimeType)Math.Sqrt(mpLimit);

            int liThreads = Environment.ProcessorCount;
            if (!Threaded) liThreads = 0;

            // Fix the bit array for pointer access
            fixed (BitArrayType* lpbOddNotPrime = &mbaOddNotPrime[0])
            {
                IntPtr lipBits = (IntPtr)lpbOddNotPrime;

                // Scan primes up to lpSQRT
                for (PrimeType lpN = 3; lpN <= lpSQRT; lpN += 2)
                {
                    // If the current bit value for index lpN is cleared (prime)
                    if (
                            (
                                lpbOddNotPrime[(lpN >> 1) / cbBitsPerBlock] &
                                ((BitArrayType)1 << (BitsPerBlockType)((lpN >> 1) % cbBitsPerBlock))
                            ) == 0
                        )
                    {
                        // If multi-threaded
                        if (liThreads > 1)
                        {
                            // Leave it cleared (prime) and mark all multiples of lpN*2 from lpN*lpN as not prime
                            For<PrimeType>(
                                0, ((mpLimit - (lpN * lpN)) / (lpN << 1)) + 1, lpN,
                                (start, end, n) =>
                                {
                                    BitArrayType* lpbBits = (BitArrayType*)lipBits;
                                    PrimeType lpM = n * n + (start * (n << 1));
                                    for (PrimeType lpCount = start; lpCount < end; lpCount++)
                                    {
                                        // Set as not prime
                                        lpbBits[(lpM >> 1) / cbBitsPerBlock] |=
                                            (BitArrayType)((BitArrayType)1 << (BitsPerBlockType)((lpM >> 1) % cbBitsPerBlock));

                                        lpM += n << 1;
                                    }
                                },
                                liThreads);
                        }
                        else
                        {
                            // Leave it cleared (prime) and mark all multiples of lpN*2 from lpN*lpN as not prime
                            for (PrimeType lpM = lpN * lpN; lpM <= mpLimit; lpM += lpN<<1)
                                // Set as not prime
                                lpbOddNotPrime[(lpM >> 1) / cbBitsPerBlock] |=
                                    (BitArrayType)((BitArrayType)1 << (BitsPerBlockType)((lpM >> 1) % cbBitsPerBlock));
                        }
                    }
                }
            }
        }

        /// <summary>
        /// Gets a bit value by index.
        /// </summary>
        /// <param name="bits">The blocks containing the bits.</param>
        /// <param name="index">The index of the bit.</param>
        /// <returns>True if bit is set, false if cleared.</returns>
        private bool GetBitSafe(BitArrayType[] bits, PrimeType index)
        {
            return (bits[index / cbBitsPerBlock] & ((BitArrayType)1 << (BitsPerBlockType)(index % cbBitsPerBlock))) != 0;
        }

        #endregion

        #region Public Properties

        /// <summary>
        /// Gets whether this class is multi-threaded or not.
        /// </summary>
        public bool Threaded
        {
            get
            {
                return mbThreaded;
            }
        }

        /// <summary>
        /// Get the limit for the maximum prime value to find.
        /// </summary>
        public PrimeType Limit
        {
            get
            {
                return mpLimit;
            }
        }

        /// <summary>
        /// Returns the number of primes found in the range.
        /// </summary>
        public PrimeType Count
        {
            get
            {
                PrimeType lptCount = 0;
                foreach (PrimeType liPrime in this)
                    lptCount++;
                return lptCount;
            }
        }

        /// <summary>
        /// Determines if a value in range is prime or not.
        /// </summary>
        /// <param name="test">The value to test for primality.</param>
        /// <returns>True if the value is prime, false otherwise.</returns>
        public bool this[PrimeType test]
        {
            get
            {
                if (test > mpLimit) throw new ArgumentOutOfRangeException();
                if (test <= 1) return false;
                if (test == 2) return true;
                if ((test & 1) == 0) return false;
                return !GetBitSafe(mbaOddNotPrime, test >> 1);
            }
        }

        #endregion

        #region Public Methods

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns>The enumerator of the primes.</returns>
        public IEnumerator<PrimeType> GetEnumerator()
        {
            // Two always prime.
            yield return 2;

            // Start at first block, second MSB.
            int liBlock = 0;
            byte lbBit = 1;
            BitArrayType lbaCurrent = mbaOddNotPrime[0] >> 1;

            // For each value in range stepping in incrments of two for odd values.
            for (PrimeType lpN = 3; lpN <= mpLimit; lpN += 2)
            {
                // If current bit not set then value is prime.
                if ((lbaCurrent & 1) == 0)
                    yield return lpN;

                // Move to NSB.
                lbaCurrent >>= 1;

                // Increment bit value.
                lbBit++;

                // If block is finished.
                if (lbBit == cbBitsPerBlock) 
                {
                    // Move to first bit of next block.
                    lbBit = 0;
                    liBlock++;
                    lbaCurrent = mbaOddNotPrime[liBlock];
                }
            }
        }

        #endregion

        #region IEnumerable<PrimeType> Implementation

        /// <summary>
        /// Gets the enumerator for the primes.
        /// </summary>
        /// <returns></returns>
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        #endregion
    }
}

多线程通过对最内层循环进行线程化来工作,这样就不会出现数据锁定问题,因为多个线程使用数组的子集,并且在完成每个任务时不重叠。

看起来非常快,可以在 AMD Phenom II X4 965 处理器上生成所有小于 1,000,000,000 的质数,用时仅为 5.8 秒。像阿特金斯算法等特殊实现更快,但这对于埃拉托色尼筛法来说已经很快了。


很好,但你错过了两个优化:1)在多线程中,你的内部循环数组范围不断增加,直到它显著超过L1高速缓存大小,从而增加了内存访问时间的低效性;2)你只消除偶数修剪,而完整轮筛法可以消除更多的修剪,代价是增加了复杂度。使用这两个进一步的优化,我在比你机器差不多的机器上,在1.975秒内找到了10亿以内的质数,并在8.738秒内找到了4294967295以内的质数:https://dev59.com/TknSa4cB1Zd3GeqPRM54#18139477。 - GordonBGood
上述链接仅使用2、3、5的轮式分解,但稍微增加复杂度可以使用2、3、5、7、11、13的轮式分解来获得进一步的收益;然而,这没有太大意义,因为大约三分之二的时间都花在枚举前台线程中找到的素数上,尽管它可能通过增加复合数数组的包装来减少运行时间约百分之几。 上述代码也可以从您所使用的不安全代码中受益,目前并未使用。 - GordonBGood
还不错,但有两点评论:1)在我的i7(八个线程)CPU上,使用多线程比不使用多线程只提高了约30%的速度,因此在所有范围内使用线程会有额外的开销,这相当低效,正如我在第一条评论中提到的那样。2)与直接使用数组相比,您使用不安全指针实际上会稍微减慢程序的速度;这是因为DotNet JIT编译器虽然消除了数组边界检查,但并没有很好地优化指针。 - GordonBGood
@cont'd: 此外,您的内部循环有些过于复杂,可以简化为"mbaOddNotPrime[(lpM) / cbBitsPerBlock] |= (BitArrayType)1 << (int)lpM;" ,同时适当调整lpM的初始条件、限制(在使用时)和lpM增量,均除以2 (>> 1)。这是因为已经有一个左移值的自动掩码(% cbBitsPerBlock),所以另一个是多余的,并消除了每个循环的持续多次">> 1"操作。然而,当数组大小超过CPU缓存大小时,仍然会因巨大的内存访问开销而变慢。 - GordonBGood

1
前段时间,我尝试实现Atkin筛法的并行计算。但是失败了。我没有进行更深入的研究,但似乎Eratosthenes筛和Atkin筛都很难在多个CPU上扩展,因为我看到的实现使用了共享的整数列表。共享状态是一个沉重的负担,当你尝试在多个CPU上扩展时。

@TapasBose,你很快就接受了一个回答,基本上说这个问题的代码已经无法更好地解决了?问题代码和Jonas的工作都存在一个问题,那就是它们考虑将细粒度并行应用于问题而不是较粗粒度。解决这个问题的正确方法是,在将工作分成页面段之后,为每个段落分配线程/任务来整理每个段落页面的工作,仅共享用于整理和最终结果聚合的基本素数源;通过这种方式,几乎不需要同步。 - GordonBGood
@GordonBGood,是的先生,我确实已经接受了答案,而且很久以前就接受了;也许您对接受的原因是正确的。我已经很长时间没有接触C#了,由于职业原因,我现在正在使用Java工作。最近,我使用Lamdba和并行计算在Java8中编写了一段SoE代码。我看过您在其他SO线程中的工作,那些都非常棒。 - Tapas Bose
我应该将这个作为评论而不是答案发布。对不起。 - Jonas Elfström
@TapasBose,虽然这个问题是关于 C# 的,但是这个 算法 适用于包括 Java 在内的任何语言,尤其是现在 Java8 支持任务使得多线程更容易。我已经将此 C# 代码翻译成了 Java(和 Scala),那些语言中的代码运行速度与 C# 中的速度大致相同。 - GordonBGood

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接