编辑:
我的回答是:是的,您绝对可以使用任务并行库(TPL)更快地找到一亿个质数。问题中给出的代码之所以慢,是因为它没有有效地利用内存或多处理,并且最终输出也不高效。
因此,除了仅进行多处理之外,还有许多事情可以做来加速埃拉托斯特尼筛法,具体如下:
您筛选所有数字,包括偶数和奇数,这会使用更多的内存(对于10亿的范围需要10亿字节)并且速度较慢,因为进行了不必要的处理。只需利用2是唯一的偶素数这一事实,使数组仅表示奇素数即可减少一半的内存需求,并将合成数筛选操作的数量减少超过两倍,因此该操作在您的计算机上可能需要约20秒来生成10亿以内的素数。
筛选如此巨大的内存数组的合成数之一部分原因是它远远超过了CPU缓存大小,因此许多内存访问以某种随机方式到达主内存,这意味着筛选给定的合成数表示可以需要超过100个CPU时钟周期,而如果它们全部在L1缓存中,则只需要一个周期,在L2缓存中仅需要约4个周期; 并非所有访问都需要最坏情况的时间,但这肯定会减慢处理速度。使用位压缩数组来表示素数候选者将使内存使用减少8倍,并使最坏情况下的访问不太常见。虽然访问单个位的计算开销会有所增加,但您会发现平均内存访问时间的时间节省将大于此成本。实现这一简单方法是使用BitArray而不是bool数组。编写自己的位访问使用shift和“and”操作将比使用BitArray类更有效。您会发现使用BitArray会稍微节省一些时间,并且使用自己的位操作可以使单线程性能减少约10到12秒。
您找到的素数计数输出效率不高,因为每个候选素数都需要一个数组访问和一个if条件。一旦您将筛选缓冲区作为位打包的单词数组,就可以使用计数查找表(LUT)更有效地完成此操作,该表消除了if条件,并且每个位打包的单词只需要两个数组访问。进行这样做,计数时间成为工作中可以忽略不计的部分,与筛选合成数的时间相比,进一步节省可达到对于10亿以内的素数计数大约8秒钟。
进一步减少处理的素数候选者可以是应用轮子分解的结果,该分解从处理中删除2、3和5的素数因子,并通过调整位打包方法还可以将给定大小的位缓冲区的有效范围增加约两倍。这可以将合成数筛选操作的数量减少多达三倍以上,尽管代价是进一步的计算复杂性。
为了进一步减少内存使用,使内存访问更加高效,并为多处理准备道路,可以将工作分成不大于L1或L2缓存大小的页面。这要求保留一个基本素数表,该表包含所有最大素数候选的平方根,并重新计算在给定页面段上跨越的每个基本素数的起始地址参数,但这仍然比使用巨大的筛选数组更有效。实施此页面分段的附加好处是,您不必预先指定上限筛选界限,而是只需要根据需要扩展基本素数以处理更高的页面即可。到目前为止的所有优化,您可能可以在约2.5秒钟内生成10亿以内的素数计数。
最后,可以使用TPL或线程对页面段进行多处理,其中每个核心的缓冲区
我已经实现了一个多线程的埃拉托色尼筛法作为
回复另一个线程的答案,以展示埃拉托色尼筛法没有比阿特金筛法更具优势。它使用任务并行库(TPL)中的任务和任务工厂,因此至少需要DotNet Framework 4. 我进一步调整了那段代码,使用了上面讨论过的所有优化方法,作为
对同一问题的备选答案。我在这里重新发布了那段调整过的代码,并添加了注释和易于阅读的格式,如下所示:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class UltimatePrimesSoE : IEnumerable<ulong> {
#region const and static readonly field's, private struct's and classes
static readonly uint NUMPRCSPCS = (uint)Environment.ProcessorCount + 1;
const int L1CACHEPOW = 14, L1CACHESZ = (1 << L1CACHEPOW), MXPGSZ = L1CACHESZ / 2;
const uint CHNKSZ = 17;
static readonly byte[] WHLPTRN = { 2,3,1,3,2,1,2,3,3,1,3,2,1,3,2,3,4,2,1,2,1,2,4,3,
2,3,1,2,3,1,3,3,2,1,2,3,1,3,2,1,2,1,5,1,5,1,2,1 }; const uint FSTCP = 11;
static readonly byte[] WHLPOS; static readonly byte[] WHLNDX;
static readonly byte[] WHLRNDUP;
static readonly uint WCRC = WHLPTRN.Aggregate(0u, (acc, n) => acc + n);
static readonly uint WHTS = (uint)WHLPTRN.Length; static readonly uint WPC = WHTS >> 4;
static readonly byte[] BWHLPRMS = { 2,3,5,7,11,13,17 }; const uint FSTBP = 19;
static readonly uint BWHLWRDS = BWHLPRMS.Aggregate(1u, (acc, p) => acc * p) / 2 / WCRC * WHTS / 16;
static readonly uint PGSZ = MXPGSZ / BWHLWRDS * BWHLWRDS; static readonly uint PGRNG = PGSZ * 16 / WHTS * WCRC;
static readonly uint BFSZ = CHNKSZ * PGSZ, BFRNG = CHNKSZ * PGRNG;
static readonly ushort[] MCPY;
struct Wst { public ushort msk; public byte mlt; public byte xtr; public ushort nxt; }
static readonly byte[] PRLUT; static readonly Wst[] WSLUT;
static readonly byte[] CLUT;
class Bpa {
byte[] sa = new byte[0]; uint lwi = 0, lpd = 0; object lck = new object();
public uint this[uint i] {
get {
if (i >= this.sa.Length) lock (this.lck) {
var lngth = this.sa.Length; while (i >= lngth) {
var bf = (ushort[])MCPY.Clone(); if (lngth == 0) {
for (uint bi = 0, wi = 0, w = 0, msk = 0x8000, v = 0; w < bf.Length;
bi += WHLPTRN[wi++], wi = (wi >= WHTS) ? 0 : wi) {
if (msk >= 0x8000) { msk = 1; v = bf[w++]; } else msk <<= 1;
if ((v & msk) == 0) {
var p = FSTBP + (bi + bi); var k = (p * p - FSTBP) >> 1;
if (k >= PGRNG) break; var pd = p / WCRC; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
for (uint wrd = kd * WPC + (uint)(kn >> 4), ndx = wi * WHTS + kn; wrd < bf.Length; ) {
var st = WSLUT[ndx]; bf[wrd] |= st.msk; wrd += st.mlt * pd + st.xtr; ndx = st.nxt;
}
}
}
}
else { this.lwi += PGRNG; cullbf(this.lwi, bf); }
var c = count(PGRNG, bf); var na = new byte[lngth + c]; sa.CopyTo(na, 0);
for (uint p = FSTBP + (this.lwi << 1), wi = 0, w = 0, msk = 0x8000, v = 0;
lngth < na.Length; p += (uint)(WHLPTRN[wi++] << 1), wi = (wi >= WHTS) ? 0 : wi) {
if (msk >= 0x8000) { msk = 1; v = bf[w++]; } else msk <<= 1; if ((v & msk) == 0) {
var pd = p / WCRC; na[lngth++] = (byte)(((pd - this.lpd) << 6) + wi); this.lpd = pd;
}
} this.sa = na;
}
} return this.sa[i];
}
}
}
static readonly Bpa baseprms = new Bpa();
struct PrcsSpc { public Task tsk; public ushort[] buf; }
#endregion
#region private static methods
static int count(uint bitlim, ushort[] buf) {
if (bitlim < BFRNG) {
var addr = (bitlim - 1) / WCRC; var bit = WHLNDX[bitlim - addr * WCRC] - 1; addr *= WPC;
for (var i = 0; i < 3; ++i) buf[addr++] |= (ushort)((unchecked((ulong)-2) << bit) >> (i << 4));
}
var acc = 0; for (uint i = 0, w = 0; i < bitlim; i += WCRC)
acc += CLUT[buf[w++]] + CLUT[buf[w++]] + CLUT[buf[w++]]; return acc;
}
static void cullbf(ulong lwi, ushort[] b) {
ulong nlwi = lwi;
for (var i = 0u; i < b.Length; nlwi += PGRNG, i += PGSZ) MCPY.CopyTo(b, i);
for (uint i = 0, pd = 0; ; ++i) {
pd += (uint)baseprms[i] >> 6;
var wi = baseprms[i] & 0x3Fu; var wp = (uint)WHLPOS[wi]; var p = pd * WCRC + PRLUT[wi];
var k = ((ulong)p * (ulong)p - FSTBP) >> 1;
if (k >= nlwi) break; if (k < lwi) {
k = (lwi - k) % (WCRC * p);
if (k != 0) {
var nwp = wp + (uint)((k + p - 1) / p); k = (WHLRNDUP[nwp] - wp) * p - k;
}
}
else k -= lwi; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
for (uint wrd = (uint)kd * WPC + (uint)(kn >> 4), ndx = wi * WHTS + kn; wrd < b.Length; ) {
var st = WSLUT[ndx]; b[wrd] |= st.msk; wrd += st.mlt * pd + st.xtr; ndx = st.nxt;
}
}
}
static Task cullbftsk(ulong lwi, ushort[] b, Action<ushort[]> f) {
return Task.Factory.StartNew(() => { cullbf(lwi, b); f(b); });
}
static void IterateTo(ulong top_number, Action<ulong, uint, ushort[]> actn) {
PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS]; for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s] = new PrcsSpc {
buf = new ushort[BFSZ],
tsk = Task.Factory.StartNew(() => { })
};
var topndx = (top_number - FSTBP) >> 1; for (ulong ndx = 0; ndx <= topndx; ) {
ps[0].tsk.Wait(); var buf = ps[0].buf; for (var s = 0u; s < NUMPRCSPCS - 1; ++s) ps[s] = ps[s + 1];
var lowi = ndx; var nxtndx = ndx + BFRNG; var lim = topndx < nxtndx ? (uint)(topndx - ndx + 1) : BFRNG;
ps[NUMPRCSPCS - 1] = new PrcsSpc { buf = buf, tsk = cullbftsk(ndx, buf, (b) => actn(lowi, lim, b)) };
ndx = nxtndx;
} for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s].tsk.Wait();
}
static void IterateUntil(Func<ulong, ushort[], bool> prdct) {
PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS];
for (var s = 0u; s < NUMPRCSPCS; ++s) {
var buf = new ushort[BFSZ];
ps[s] = new PrcsSpc { buf = buf, tsk = cullbftsk(s * BFRNG, buf, (bfr) => { }) };
}
for (var ndx = 0UL; ; ndx += BFRNG) {
ps[0].tsk.Wait(); var buf = ps[0].buf; var lowi = ndx; if (prdct(lowi, buf)) break;
for (var s = 0u; s < NUMPRCSPCS - 1; ++s) ps[s] = ps[s + 1];
ps[NUMPRCSPCS - 1] = new PrcsSpc {
buf = buf,
tsk = cullbftsk(ndx + NUMPRCSPCS * BFRNG, buf, (bfr) => { })
};
}
}
#endregion
#region initialization
static UltimatePrimesSoE() {
WHLPOS = new byte[WHLPTRN.Length + 1];
for (byte i = 0, acc = 0; i < WHLPTRN.Length; ++i) { acc += WHLPTRN[i]; WHLPOS[i + 1] = acc; }
WHLNDX = new byte[WCRC + 1]; for (byte i = 1; i < WHLPOS.Length; ++i) {
for (byte j = (byte)(WHLPOS[i - 1] + 1); j <= WHLPOS[i]; ++j) WHLNDX[j] = i;
}
WHLRNDUP = new byte[WCRC * 2]; for (byte i = 1; i < WHLRNDUP.Length; ++i) {
if (i > WCRC) WHLRNDUP[i] = (byte)(WCRC + WHLPOS[WHLNDX[i - WCRC]]); else WHLRNDUP[i] = WHLPOS[WHLNDX[i]];
}
Func<ushort, int> nmbts = (v) => { var acc = 0; while (v != 0) { acc += (int)v & 1; v >>= 1; } return acc; };
CLUT = new byte[1 << 16]; for (var i = 0; i < CLUT.Length; ++i) CLUT[i] = (byte)nmbts((ushort)(i ^ -1));
PRLUT = new byte[WHTS]; for (var i = 0; i < PRLUT.Length; ++i) {
var t = (uint)(WHLPOS[i] * 2) + FSTBP; if (t >= WCRC) t -= WCRC; if (t >= WCRC) t -= WCRC; PRLUT[i] = (byte)t;
}
WSLUT = new Wst[WHTS * WHTS]; for (var x = 0u; x < WHTS; ++x) {
var p = FSTBP + 2u * WHLPOS[x]; var pr = p % WCRC;
for (uint y = 0, pos = (p * p - FSTBP) / 2; y < WHTS; ++y) {
var m = WHLPTRN[(x + y) % WHTS];
pos %= WCRC; var posn = WHLNDX[pos]; pos += m * pr; var nposd = pos / WCRC; var nposn = WHLNDX[pos - nposd * WCRC];
WSLUT[x * WHTS + posn] = new Wst {
msk = (ushort)(1 << (int)(posn & 0xF)),
mlt = (byte)(m * WPC),
xtr = (byte)(WPC * nposd + (nposn >> 4) - (posn >> 4)),
nxt = (ushort)(WHTS * x + nposn)
};
}
}
MCPY = new ushort[PGSZ]; foreach (var lp in BWHLPRMS.SkipWhile(p => p < FSTCP)) {
var p = (uint)lp;
var k = (p * p - FSTBP) >> 1; var pd = p / WCRC; var kd = k / WCRC; var kn = WHLNDX[k - kd * WCRC];
for (uint w = kd * WPC + (uint)(kn >> 4), ndx = WHLNDX[(2 * WCRC + p - FSTBP) / 2] * WHTS + kn; w < MCPY.Length; ) {
var st = WSLUT[ndx]; MCPY[w] |= st.msk; w += st.mlt * pd + st.xtr; ndx = st.nxt;
}
}
}
#endregion
#region public class
class nmrtr : IEnumerator<ulong>, IEnumerator, IDisposable {
PrcsSpc[] ps = new PrcsSpc[NUMPRCSPCS]; ushort[] buf;
public nmrtr() {
for (var s = 0u; s < NUMPRCSPCS; ++s) ps[s] = new PrcsSpc { buf = new ushort[BFSZ] };
for (var s = 1u; s < NUMPRCSPCS; ++s) {
ps[s].tsk = cullbftsk((s - 1u) * BFRNG, ps[s].buf, (bfr) => { });
} buf = ps[0].buf;
}
ulong _curr, i = (ulong)-WHLPTRN[WHTS - 1]; int b = -BWHLPRMS.Length - 1; uint wi = WHTS - 1; ushort v, msk = 0;
public ulong Current { get { return this._curr; } } object IEnumerator.Current { get { return this._curr; } }
public bool MoveNext() {
if (b < 0) {
if (b == -1) b += buf.Length;
else { this._curr = (ulong)BWHLPRMS[BWHLPRMS.Length + (++b)]; return true; }
}
do {
i += WHLPTRN[wi++]; if (wi >= WHTS) wi = 0; if ((this.msk <<= 1) == 0) {
if (++b >= BFSZ) {
b = 0; for (var prc = 0; prc < NUMPRCSPCS - 1; ++prc) ps[prc] = ps[prc + 1];
ps[NUMPRCSPCS - 1u].buf = buf;
ps[NUMPRCSPCS - 1u].tsk = cullbftsk(i + (NUMPRCSPCS - 1u) * BFRNG, buf, (bfr) => { });
ps[0].tsk.Wait(); buf = ps[0].buf;
} v = buf[b]; this.msk = 1;
}
}
while ((v & msk) != 0u); _curr = FSTBP + i + i; return true;
}
public void Reset() { throw new Exception("Primes enumeration reset not implemented!!!"); }
public void Dispose() { }
}
#endregion
#region public instance method and associated sub private method
public IEnumerator<ulong> GetEnumerator() { return new nmrtr(); }
IEnumerator IEnumerable.GetEnumerator() { return new nmrtr(); }
#endregion
#region public static methods
public static long CountTo(ulong top_number) {
if (top_number < FSTBP) return BWHLPRMS.TakeWhile(p => p <= top_number).Count();
var cnt = (long)BWHLPRMS.Length;
IterateTo(top_number, (lowi, lim, b) => { Interlocked.Add(ref cnt, count(lim, b)); }); return cnt;
}
public static ulong SumTo(uint top_number) {
if (top_number < FSTBP) return (ulong)BWHLPRMS.TakeWhile(p => p <= top_number).Aggregate(0u, (acc, p) => acc += p);
var sum = (long)BWHLPRMS.Aggregate(0u, (acc, p) => acc += p);
Func<ulong, uint, ushort[], long> sumbf = (lowi, bitlim, buf) => {
var acc = 0L; for (uint i = 0, wi = 0, msk = 0x8000, w = 0, v = 0; i < bitlim;
i += WHLPTRN[wi++], wi = wi >= WHTS ? 0 : wi) {
if (msk >= 0x8000) { msk = 1; v = buf[w++]; } else msk <<= 1;
if ((v & msk) == 0) acc += (long)(FSTBP + ((lowi + i) << 1));
} return acc;
};
IterateTo(top_number, (pos, lim, b) => { Interlocked.Add(ref sum, sumbf(pos, lim, b)); }); return (ulong)sum;
}
public static ulong ElementAt(long index) {
if (index < BWHLPRMS.Length) return (ulong)BWHLPRMS.ElementAt((int)index);
long cnt = BWHLPRMS.Length; var ndx = 0UL; var cycl = 0u; var bit = 0u; IterateUntil((lwi, bfr) => {
var c = count(BFRNG, bfr); if ((cnt += c) < index) return false; ndx = lwi; cnt -= c; c = 0;
do { var w = cycl++ * WPC; c = CLUT[bfr[w++]] + CLUT[bfr[w++]] + CLUT[bfr[w]]; cnt += c; } while (cnt < index);
cnt -= c; var y = (--cycl) * WPC; ulong v = ((ulong)bfr[y + 2] << 32) + ((ulong)bfr[y + 1] << 16) + bfr[y];
do { if ((v & (1UL << ((int)bit++))) == 0) ++cnt; } while (cnt <= index); --bit; return true;
}); return FSTBP + ((ndx + cycl * WCRC + WHLPOS[bit]) << 1);
}
#endregion
}
上述代码将在四核(包括HT,共八个线程)i7-2700K(3.5 GHz)上枚举出十亿以内的质数,大约需要1.55秒,而您的E7500可能会慢上四倍,因为它的线程更少,时钟速度稍低。其中约四分之三的时间仅用于运行MoveNext()方法和Current属性,因此我提供了公共静态方法"CountTo"、"SumTo"和"ElementAt"来计算范围内的质数数量或总和以及第n个从零开始的质数,而不使用枚举。在我的计算机上,使用UltimatePrimesSoE.CountTo(1000000000)静态方法大约需要0.32秒即可产生50847534,因此在Intel E7500上不应该需要超过1.28秒。
编辑添加:有趣的是,这段代码在x86 32位模式下比在x64 64位模式下运行得快30%,这可能是因为避免了将uint32数字扩展为ulong的轻微额外开销。以上所有定时均为64位模式。
这个实现有将近300行(密集)代码,它并不简单,但这是为了做出所有描述的优化以使代码更高效的代价。它的代码行数比
Aaron Murgatroyd的另一个答案略多,虽然他的代码行数较少,但他的代码速度也慢了约4倍。事实上,几乎所有执行时间都花费在我代码的私有静态"cullbf"方法的最后一个“for循环”中,该循环仅有4个语句加上范围条件检查;所有其余代码都只是支持重复应用该循环。
这段代码比其他答案更快的原因,与你的代码相比,除了他只处理奇素数候选项的步骤(1)优化外,还有相同的原因。他使用多进程几乎没有效果,仅有30%的优势,而不是应该在真正的四核CPU上正确应用时可能达到的四倍因子,因为它是针对每个素数而不是所有小页面上的所有素数进行线程处理,并且他使用不安全的指针数组访问作为一种消除DotNet计算成本的方法,即每个循环的数组边界检查实际上会使代码变慢,而不是直接使用数组,包括边界检查,因为DotNet JIT编译器对指针访问产生的代码效率非常低。此外,他的代码枚举素数的方式与我的代码相同,这种枚举每枚举一个素数就需要10个CPU时钟周期的成本,在他的情况下稍微差一些,因为他使用内置的C#迭代器,这些迭代器比我的“自己编写”的IEnumerator接口效率略低。然而,为了获得最大的速度,我们应该完全避免枚举;然而,即使他提供的“Count”实例方法也使用“foreach”循环,这意味着枚举。
总之,这个答案代码在您的E7500 CPU上比您的问题代码快大约25倍(在具有更多核心/线程的CPU上快得多),使用的内存要少得多,并且不仅限于约32位数范围内的较小质数范围,但代价是增加了代码复杂性。