我正在尝试在Java中创建一个快速的素数生成器。通常认为,最快的方法是Eratosthenes分段筛法:https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes。可以进一步实现许多优化以使其更快。截至目前,我的实现在大约1.6秒内生成了
作为一个
为了回答这个问题,我想要区分'Eratosthenes筛法'的“分段”和“传统”筛法。传统筛法需要
这让我开始阅读有关多线程的文章——将工作分配给几个线程,每个线程处理较少量的工作,以更好地利用CPU。据我所知,传统筛法无法进行多线程操作,因为它是顺序的。每个线程都会依赖于前一个线程,使整个想法变得不可行。但是,分段筛法可能确实可以进行多线程处理(我认为)。
在直接进入我的问题之前,我认为介绍我的代码非常重要,因此我在此包含了我目前最快的分段筛法实现。我已经非常努力地工作了。花费了相当长的时间,慢慢地对其进行调整和优化。代码并不简单。我会说是相当复杂的。因此,我假设读者熟悉我正在介绍的概念,例如轮子分解、质数、分段等等。我已经包括了笔记,以使其更容易理解。
这意味着跳过这些质数的倍数,从而缩小搜索范围。然后使用“preCalc”方法计算需要获取的数字之间的差距。如果我们在搜索范围内的数字之间进行这些跳跃,就可以跳过基本质数的倍数。
在
使用了以下方法:
然后,在初始化跟踪素数枚举的变量
使用更高的限制将最终生成更大的片段,这可能会导致需要检查即使在片段内我们也不会超过间隙列表。或者微调轮质数基数可能对程序产生影响。转换为位筛可以大大提高段限制。
10^9
以下的50847534
个质数,但我希望让它更快,并至少突破1
秒的限制。为了增加获得良好回复的机会,我将包括算法和代码的演示。作为一个
TL;DR
,我仍然希望将多线程编程纳入代码中。为了回答这个问题,我想要区分'Eratosthenes筛法'的“分段”和“传统”筛法。传统筛法需要
O(n)
的空间,因此在输入范围(它的极限)上非常有限。而分段筛法只需要O(n^0.5)
的空间,并且可以处理更大范围的输入(主要加速是使用缓存友好的分段,考虑特定计算机的L1&L2
缓存大小)。最后,涉及我的问题的主要区别是传统筛法是顺序的,意味着只有在完成前面的步骤后才能继续进行。然而,分段筛法不是这样的。每个段都是独立的,并且针对素数筛选(小于n^0.5
的素数)单独进行“处理”。这意味着理论上,一旦我有了素数筛选,我就可以在多台计算机之间分配工作,每个计算机处理不同的段。彼此的工作是独立的。假设(错误地)每个段需要相同的时间t
来完成,且有k
个段,则一个计算机需要总时间T = k * t
,而k
台计算机,每个处理不同的段,则需要总时间T = t
来完成整个过程。(实际上,这是错误的,但为了简单起见,这是一个例子)。这让我开始阅读有关多线程的文章——将工作分配给几个线程,每个线程处理较少量的工作,以更好地利用CPU。据我所知,传统筛法无法进行多线程操作,因为它是顺序的。每个线程都会依赖于前一个线程,使整个想法变得不可行。但是,分段筛法可能确实可以进行多线程处理(我认为)。
在直接进入我的问题之前,我认为介绍我的代码非常重要,因此我在此包含了我目前最快的分段筛法实现。我已经非常努力地工作了。花费了相当长的时间,慢慢地对其进行调整和优化。代码并不简单。我会说是相当复杂的。因此,我假设读者熟悉我正在介绍的概念,例如轮子分解、质数、分段等等。我已经包括了笔记,以使其更容易理解。
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
public class primeGen {
public static long x = (long)Math.pow(10, 9); //limit
public static int sqrtx;
public static boolean [] sievingPrimes; //the sieving primes, <= sqrtx
public static int [] wheels = new int [] {2,3,5,7,11,13,17,19}; // base wheel primes
public static int [] gaps; //the gaps, according to the wheel. will enable skipping multiples of the wheel primes
public static int nextp; // the first prime > wheel primes
public static int l; // the amount of gaps in the wheel
public static void main(String[] args)
{
long startTime = System.currentTimeMillis();
preCalc(); // creating the sieving primes and calculating the list of gaps
int segSize = Math.max(sqrtx, 32768*8); //size of each segment
long u = nextp; // 'u' is the running index of the program. will continue from one segment to the next
int wh = 0; // the will be the gap index, indicating by how much we increment 'u' each time, skipping the multiples of the wheel primes
long pi = pisqrtx(); // the primes count. initialize with the number of primes <= sqrtx
for (long low = 0 ; low < x ; low += segSize) //the heart of the code. enumerating the primes through segmentation. enumeration will begin at p > sqrtx
{
long high = Math.min(x, low + segSize);
boolean [] segment = new boolean [(int) (high - low + 1)];
int g = -1;
for (int i = nextp ; i <= sqrtx ; i += gaps[g])
{
if (sievingPrimes[(i + 1) / 2])
{
long firstMultiple = (long) (low / i * i);
if (firstMultiple < low)
firstMultiple += i;
if (firstMultiple % 2 == 0) //start with the first odd multiple of the current prime in the segment
firstMultiple += i;
for (long j = firstMultiple ; j < high ; j += i * 2)
segment[(int) (j - low)] = true;
}
g++;
//if (g == l) //due to segment size, the full list of gaps is never used **within just one segment** , and therefore this check is redundant.
//should be used with bigger segment sizes or smaller lists of gaps
//g = 0;
}
while (u <= high)
{
if (!segment[(int) (u - low)])
pi++;
u += gaps[wh];
wh++;
if (wh == l)
wh = 0;
}
}
System.out.println(pi);
long endTime = System.currentTimeMillis();
System.out.println("Solution took "+(endTime - startTime) + " ms");
}
public static boolean [] simpleSieve (int l)
{
long sqrtl = (long)Math.sqrt(l);
boolean [] primes = new boolean [l/2+2];
Arrays.fill(primes, true);
int g = -1;
for (int i = nextp ; i <= sqrtl ; i += gaps[g])
{
if (primes[(i + 1) / 2])
for (int j = i * i ; j <= l ; j += i * 2)
primes[(j + 1) / 2]=false;
g++;
if (g == l)
g=0;
}
return primes;
}
public static long pisqrtx ()
{
int pi = wheels.length;
if (x < wheels[wheels.length-1])
{
if (x < 2)
return 0;
int k = 0;
while (wheels[k] <= x)
k++;
return k;
}
int g = -1;
for (int i = nextp ; i <= sqrtx ; i += gaps[g])
{
if(sievingPrimes[( i + 1 ) / 2])
pi++;
g++;
if (g == l)
g=0;
}
return pi;
}
public static void preCalc ()
{
sqrtx = (int) Math.sqrt(x);
int prod = 1;
for (long p : wheels)
prod *= p; // primorial
nextp = BigInteger.valueOf(wheels[wheels.length-1]).nextProbablePrime().intValue(); //the first prime that comes after the wheel
int lim = prod + nextp; // circumference of the wheel
boolean [] marks = new boolean [lim + 1];
Arrays.fill(marks, true);
for (int j = 2 * 2 ;j <= lim ; j += 2)
marks[j] = false;
for (int i = 1 ; i < wheels.length ; i++)
{
int p = wheels[i];
for (int j = p * p ; j <= lim ; j += 2 * p)
marks[j]=false; // removing all integers that are NOT comprime with the base wheel primes
}
ArrayList <Integer> gs = new ArrayList <Integer>(); //list of the gaps between the integers that are coprime with the base wheel primes
int d = nextp;
for (int p = d + 2 ; p < marks.length ; p += 2)
{
if (marks[p]) //d is prime. if p is also prime, then a gap is identified, and is noted.
{
gs.add(p - d);
d = p;
}
}
gaps = new int [gs.size()];
for (int i = 0 ; i < gs.size() ; i++)
gaps[i] = gs.get(i); // Arrays are faster than lists, so moving the list of gaps to an array
l = gaps.length;
sievingPrimes = simpleSieve(sqrtx); //initializing the sieving primes
}
}
目前,它在大约1.6秒内产生了10^9以下的50847534个质数。按我的标准来说,这非常令人印象深刻,但我希望让它更快,可能突破1秒的限制。即使如此,我认为它仍然可以更快。
整个程序基于轮筛法:https://en.wikipedia.org/wiki/Wheel_factorization。我注意到使用包括所有小于19的质数的轮筛可以获得最快的结果。
public static int [] wheels = new int [] {2,3,5,7,11,13,17,19}; // base wheel primes
这意味着跳过这些质数的倍数,从而缩小搜索范围。然后使用“preCalc”方法计算需要获取的数字之间的差距。如果我们在搜索范围内的数字之间进行这些跳跃,就可以跳过基本质数的倍数。
public static void preCalc ()
{
sqrtx = (int) Math.sqrt(x);
int prod = 1;
for (long p : wheels)
prod *= p; // primorial
nextp = BigInteger.valueOf(wheels[wheels.length-1]).nextProbablePrime().intValue(); //the first prime that comes after the wheel
int lim = prod + nextp; // circumference of the wheel
boolean [] marks = new boolean [lim + 1];
Arrays.fill(marks, true);
for (int j = 2 * 2 ;j <= lim ; j += 2)
marks[j] = false;
for (int i = 1 ; i < wheels.length ; i++)
{
int p = wheels[i];
for (int j = p * p ; j <= lim ; j += 2 * p)
marks[j]=false; // removing all integers that are NOT comprime with the base wheel primes
}
ArrayList <Integer> gs = new ArrayList <Integer>(); //list of the gaps between the integers that are coprime with the base wheel primes
int d = nextp;
for (int p = d + 2 ; p < marks.length ; p += 2)
{
if (marks[p]) //d is prime. if p is also prime, then a gap is identified, and is noted.
{
gs.add(p - d);
d = p;
}
}
gaps = new int [gs.size()];
for (int i = 0 ; i < gs.size() ; i++)
gaps[i] = gs.get(i); // Arrays are faster than lists, so moving the list of gaps to an array
l = gaps.length;
sievingPrimes = simpleSieve(sqrtx); //initializing the sieving primes
}
在
preCalc
方法的末尾,调用simpleSieve
方法,高效地筛选出之前提到的所有小于等于sqrtx
的素数。这是一个简单的埃拉托色尼筛法,而不是分段筛法,但仍基于先前计算的轮筛法。 public static boolean [] simpleSieve (int l)
{
long sqrtl = (long)Math.sqrt(l);
boolean [] primes = new boolean [l/2+2];
Arrays.fill(primes, true);
int g = -1;
for (int i = nextp ; i <= sqrtl ; i += gaps[g])
{
if (primes[(i + 1) / 2])
for (int j = i * i ; j <= l ; j += i * 2)
primes[(j + 1) / 2]=false;
g++;
if (g == l)
g=0;
}
return primes;
}
最后,我们来到算法的核心。我们从枚举所有小于等于<= sqrtx
的质数开始,使用以下调用:
long pi = pisqrtx();`
使用了以下方法:
public static long pisqrtx ()
{
int pi = wheels.length;
if (x < wheels[wheels.length-1])
{
if (x < 2)
return 0;
int k = 0;
while (wheels[k] <= x)
k++;
return k;
}
int g = -1;
for (int i = nextp ; i <= sqrtx ; i += gaps[g])
{
if(sievingPrimes[( i + 1 ) / 2])
pi++;
g++;
if (g == l)
g=0;
}
return pi;
}
然后,在初始化跟踪素数枚举的变量
pi
之后,我们执行所提到的分段,从第一个大于> sqrtx
的质数开始枚举: int segSize = Math.max(sqrtx, 32768*8); //size of each segment
long u = nextp; // 'u' is the running index of the program. will continue from one segment to the next
int wh = 0; // the will be the gap index, indicating by how much we increment 'u' each time, skipping the multiples of the wheel primes
long pi = pisqrtx(); // the primes count. initialize with the number of primes <= sqrtx
for (long low = 0 ; low < x ; low += segSize) //the heart of the code. enumerating the primes through segmentation. enumeration will begin at p > sqrtx
{
long high = Math.min(x, low + segSize);
boolean [] segment = new boolean [(int) (high - low + 1)];
int g = -1;
for (int i = nextp ; i <= sqrtx ; i += gaps[g])
{
if (sievingPrimes[(i + 1) / 2])
{
long firstMultiple = (long) (low / i * i);
if (firstMultiple < low)
firstMultiple += i;
if (firstMultiple % 2 == 0) //start with the first odd multiple of the current prime in the segment
firstMultiple += i;
for (long j = firstMultiple ; j < high ; j += i * 2)
segment[(int) (j - low)] = true;
}
g++;
//if (g == l) //due to segment size, the full list of gaps is never used **within just one segment** , and therefore this check is redundant.
//should be used with bigger segment sizes or smaller lists of gaps
//g = 0;
}
while (u <= high)
{
if (!segment[(int) (u - low)])
pi++;
u += gaps[wh];
wh++;
if (wh == l)
wh = 0;
}
}
我也将其作为一个注释包含在内,但也会解释一下。因为片段大小相对较小,我们不会在一个片段内遍历整个间隙列表,并检查它-这是多余的。(假设我们使用一个19-wheel
)。但在程序的更广泛范围概述中,我们将利用整个间隙数组,所以变量u
必须跟随它,而不是意外地超过它:
while (u <= high)
{
if (!segment[(int) (u - low)])
pi++;
u += gaps[wh];
wh++;
if (wh == l)
wh = 0;
}
使用更高的限制将最终生成更大的片段,这可能会导致需要检查即使在片段内我们也不会超过间隙列表。或者微调轮质数基数可能对程序产生影响。转换为位筛可以大大提高段限制。
- 作为重要的附注,我知道高效的分段是考虑到
L1&L2
缓存大小的。我使用32,768 * 8 = 262,144 = 2^18
的段大小获得最快的结果。我不确定我的计算机缓存大小是多少,但我认为它不可能那么大,因为我看到大多数缓存大小都小于等于32,768
。尽管如此,在我的计算机上这产生了最快的运行时间,所以这就是选择的段大小。 - 正如我之前提到的,我仍在努力大幅改进。根据我的介绍,我相信使用4个线程(对应4个核心),多线程可以将速度提高
4
倍。每个线程仍然使用分段筛法的思想,但处理不同的部分。将n
分成4
个相等的部分-线程,每个线程依次对其负责的n/4
个元素进行分段,使用上述程序。我的问题是如何做到这一点?阅读有关多线程和示例的资料,不幸的是,没有给我带来任何关于如何有效实现它的见解。对我来说,与其背后的逻辑相反,线程似乎是顺序运行而不是同时运行。这就是为什么我将其从代码中排除以使其更易读。我真的很希望在此特定代码中提供一个代码示例
,但一个好的解释和参考也许能够达到目的。
10^9
上需要 3.98 秒。但它也使用了 wheel。一般来说,C++ 在涉及计算时更快。至于多线程简单筛法,我很想看到它的实践,听起来很有趣。在简单筛法中,选择下一个“质数”的选择取决于前一个质数的所有倍数是否已被标记。因此,初始化新线程可能会选择一个实际上不是质数的“质数” - 它只是尚未被标记。 - MC From Scratch