有没有一种简单的方法将使用流的代码转换为使用迭代器的代码?或者有没有一种简单的方法使我的第一次尝试更具内存效率?
@Will Ness使用流给出了一个改进的答案,并解释了为什么你的代码需要这么多内存和时间,例如在早期添加流和左倾线性结构。但是,没有人完全回答了你的问题的第二部分(或者可能是主要部分),即能否使用迭代器实现真正的增量埃拉托色尼筛选法。
首先,我们应该归功于这个右倾算法,你的第一个代码是一个粗糙的(左倾)示例(因为它过早地将所有素数复合流添加到合并操作中),这归功于Richard Bird,就像
Melissa E. O'Neill关于增量埃拉托色尼筛选法的定义性论文的附录中所述。
其次,不,无法在此算法中用Iterator替换Stream,因为它依赖于在不重新启动流的情况下移动流,并且尽管可以访问迭代器的头部(当前位置),但使用下一个值(跳过头部)生成其余的迭代作为流需要建立一个全新的迭代器,这将极大地消耗内存和时间。然而,我们可以使用Iterator按顺序输出素数序列的结果,以最小化内存使用并使使用高阶函数的迭代器变得容易,正如您将在我的代码中看到的那样。
现在,Will Ness已经向您介绍了推迟将素数复合流添加到计算中直到需要它们的原则,当将它们存储在像Priority Queue或HashMap这样的结构中时,这种方法非常有效,甚至在O'Neill的论文中也被忽略了,但对于Richard Bird算法来说,这并不是必要的,因为未来的流值只有在需要时才会被访问,因此不会被存储,如果Streams被正确地惰性构建(如惰性和左倾),事实上,这个算法甚至不需要完整的Stream的记忆和开销,每个复合数筛选序列仅向前移动,而没有参考任何过去的质数,除了需要一个单独的基本质数源,可以通过同一算法的递归调用来提供。
为了方便参考,让我们将Richard Bird算法的Haskell代码列出如下:
primes = 2:([3..] ‘minus‘ composites)
where
composites = union [multiples p | p <− primes]
multiples n = map (n*) [n..]
(x:xs) ‘minus‘ (y:ys)
| x < y = x:(xs ‘minus‘ (y:ys))
| x == y = xs ‘minus‘ ys
| x > y = (x:xs) ‘minus‘ ys
union = foldr merge []
where
merge (x:xs) ys = x:merge’ xs ys
merge’ (x:xs) (y:ys)
| x < y = x:merge’ xs (y:ys)
| x == y = x:merge’ xs ys
| x > y = y:merge’ (x:xs) ys
在下面的代码中,我简化了“minus”函数(称为“minusStrtAt”),因为我们不需要构建一个全新的流,而是可以将组合减法操作与原始生成(在我的情况下仅为奇数)序列相结合。我还简化了“union”函数(将其重命名为“mrgMltpls”)
流操作作为非记忆化通用的Co Inductive Stream(CIS)实现,作为一个通用类,其中类的第一个字段是流当前位置的值,第二个字段是thunk(返回通过另一个函数嵌入闭包参数的流下一个值的零参数函数)。
def primes(): Iterator[Long] = {
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont())))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
上述代码在
ideone上大约用1.3秒钟生成第100,000个质数(1299709),额外开销约为0.36秒,并且在计算前600,000个质数时具有约为1.43的经验计算复杂度。除程序代码使用的内存外,内存使用可以忽略不计。
虽然可以使用Scala Streams来实现上述代码,但是这会带来性能和内存使用开销(一个常数因子),而该算法并不需要。使用Streams可以直接使用它们,无需额外生成Iterator代码,但由于仅用于序列的最终输出,因此成本不高。
要实现Will Ness建议的一些基本树折叠,只需要添加一个“pairs”函数并将其连接到“mrgMltpls”函数即可。
def primes(): Iterator[Long] = {
class CIS[A](val v: A, val cont: () => CIS[A])
def mltpls(p: Long): CIS[Long] = {
var px2 = p * 2
def nxtmltpl(cmpst: Long): CIS[Long] =
new CIS(cmpst, () => nxtmltpl(cmpst + px2))
nxtmltpl(p * p)
}
def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
else new CIS(b.v, () => merge(a.cont(), b.cont()))
def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = {
val tl = mltplss.cont()
new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont()))
}
def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont()))))
def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
else minusStrtAt(n + 2, cmpsts.cont())
def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
Iterator.iterate(new CIS(2L, () => oddPrms()))
{(cis: CIS[Long]) => cis.cont()}
.map {(cis: CIS[Long]) => cis.v}
}
上述代码在
ideone中大约0.75秒内生成了第100,000个质数(1299709),并且有约0.37秒的开销,到第1,000,000个质数(15485863)的经验计算复杂度约为1.09 (5.13秒)。除程序代码使用的内存外,内存使用可以忽略不计。
请注意,上述代码完全是功能性的,没有使用可变状态,但是Bird算法(甚至是树折叠版本)对于更大的范围不如使用优先队列或HashMap快,因为处理树合并的操作数量具有比Priority Queue的log n开销更高的计算复杂度,或者HashMap的线性(分摊)性能(虽然存在大量的常数因子开销来处理哈希,所以这种优势直到使用一些真正大的范围才能看到)。
这些代码之所以使用如此少的内存,是因为CIS流没有永久引用流的起始点,因此在使用流时对其进行垃圾回收,只留下最少量的基本质数复合序列占位符。正如Will Ness所解释的那样,仅使用546个基本质数复合数字流即可生成前100万个素数(最大值为15485863),每个占位符仅占用几十字节(Long数字占用8个字节,64位函数引用占用8个字节,指向闭包参数的指针和函数和类开销各占用另外几个字节),每个流占用的总空间可能只有40字节左右,生成100万个素数的序列所需的总空间不会超过20千字节。