埃拉托斯特尼筛法在F#中

37

我对在纯函数式 F# 中实现埃拉托斯特尼筛法很感兴趣。我对实际的筛法实现很感兴趣,而不是那种并不真正是筛法的天真函数式实现,因此不要像这样:

let rec PseudoSieve list =
    match list with
    | hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
    | [] -> []

上面的第二个链接简要描述了一个算法,它需要使用multimap,据我所知F#中没有这个功能。给出的Haskell实现使用了支持insertWith方法的map,但我没有看到F# functional map中有类似的方法。
是否有人知道如何将给定的Haskell映射代码转换为F#,或者知道其他实现方法或筛选算法,既高效又适合函数式实现或F#?

1
@Rafe - 传统方式需要修改数组,这样就不再是纯函数了,对吗? - IVlad
1
啊,但你可以让它看起来很纯净!比如说你想要更新数组a以生成数组b,并确保这是以纯函数的方式完成的,你需要做的是(伪代码):“a[i] := x; b = a; // 永远不要再使用a!”即使你有一个不纯的实现,你也可以给它一个纯语义。例如,在Mercury中,数组更新函数就是这样做的,而Mercury模式系统保证您的程序永远不会再次使用a。 - Rafe
1
对不起,但您是错误的:这正是Mercury和Haskell中纯粹管理状态的方式(Mercury使用独特性,而Haskell使用单子,但在底层发生的事情完全相同)。实际上,这也是如何以纯方式管理IO的。如果您的纯度承诺得到保证,那么具有纯接口的非纯实现就没有任何问题。 - Rafe
1
@IVlad - 承诺是有保障的,因为没有侵犯引用透明性。也就是说,以这种方式实现筛选函数的任何调用者都无法确定底层实现是否是impure。当然,我提议的实现确实取决于肮脏的副作用,但这些副作用对调用方是不可见的。真正的问题是,看看Mercury或Haskell中数组的实现吧! - Rafe
1
更进一步解释,“永远不要再使用 'a'”的限制正是Haskell的State和IO monads所保证的,或者Mercury的unique modes所保证的。在Haskell中,你实际上从未真正接触过数组本身,因为它隐藏在monad内部,而monad永远不会将其提供给你!在Mercury中,对数组的任何更新都会产生一个新的“unique”数组,并使旧数组“死亡”(永远不会再次访问)。当然,经过破坏性更新后,新数组与旧数组完全相同。 - Rafe
显示剩余7条评论
16个回答

18
阅读那篇文章后,我想到了一个不需要多重映射的方法。它通过将冲突的映射键一次又一次地向前移动其素数值来处理冲突的映射键,直到达到不在映射中的键为止。下面的primes是一个具有下一个迭代器值为键和质数为值的映射。
let primes = 
    let rec nextPrime n p primes =
        if primes |> Map.containsKey n then
            nextPrime (n + p) p primes
        else
            primes.Add(n, p)

    let rec prime n primes =
        seq {
            if primes |> Map.containsKey n then
                let p = primes.Item n
                yield! prime (n + 1) (nextPrime (n + p) p (primes.Remove n))
            else
                yield n
                yield! prime (n + 1) (primes.Add(n * n, n))
        }

    prime 2 Map.empty

这是来自论文的基于优先队列的算法,没有使用平方优化。我将通用的优先队列函数放在了顶部。我使用元组来表示延迟列表迭代器。
let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // skips primes 2, 3, 5, 7
    let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

这是基于优先队列的算法,使用了平方优化。为了方便向查找表中懒惰添加质数,必须同时返回轮子偏移和质数值。该算法版本的内存使用量为O(sqrt(n)),而未经优化的版本为O(n)。
let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))

这是我的测试程序。

type GenericHeap<'T when 'T : comparison>(defaultValue : 'T) =
    let mutable capacity = 1
    let mutable values = Array.create capacity defaultValue
    let mutable size = 0

    let swap i n =
        let temp = values.[i]
        values.[i] <- values.[n]
        values.[n] <- temp

    let rec rollUp i =
        if i > 0 then
            let parent = (i - 1) / 2
            if values.[i] < values.[parent] then
                swap i parent
                rollUp parent

    let rec rollDown i =
        let left, right = 2 * i + 1, 2 * i + 2

        if right < size then
            if values.[left] < values.[i] then
                if values.[left] < values.[right] then
                    swap left i
                    rollDown left
                else
                    swap right i
                    rollDown right
            elif values.[right] < values.[i] then
                swap right i
                rollDown right
        elif left < size then
            if values.[left] < values.[i] then
                swap left i

    member this.insert (value : 'T) =
        if size = capacity then
            capacity <- capacity * 2
            let newValues = Array.zeroCreate capacity
            for i in 0 .. size - 1 do
                newValues.[i] <- values.[i]
            values <- newValues

        values.[size] <- value
        size <- size + 1
        rollUp (size - 1)

    member this.delete () =
        values.[0] <- values.[size]
        size <- size - 1
        rollDown 0

    member this.deleteInsert (value : 'T) =
        values.[0] <- value
        rollDown 0

    member this.min () =
        values.[0]

    static member Insert (value : 'T) (heap : GenericHeap<'T>) =
        heap.insert value
        heap    

    static member DeleteInsert (value : 'T) (heap : GenericHeap<'T>) =
        heap.deleteInsert value
        heap    

    static member Min (heap : GenericHeap<'T>) =
        heap.min()

type Heap = GenericHeap<int64 * int * int64>

let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))


let mutable i = 0

let compare a b =
    i <- i + 1
    if a = b then
        true
    else
        printfn "%A %A %A" a b i
        false

Seq.forall2 compare (Seq.take 50000 (primes())) (Seq.take 50000 (primes2() |> Seq.map fst))
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.skip 999999
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

System.Console.ReadLine() |> ignore

@gradbot - 你说得对。如果我使用筛法,例如8809会被返回为质数,但实际上它不是。有什么办法可以修复吗? - IVlad
1
用长整型掩盖溢出问题只是掩盖了真正的问题——对Map进行过早的加法操作。当一个质数的平方被视为候选时,才应将该质数的数据添加到Map中。这将使Map的大小缩小到平方根大小,节省大量内存并加快代码速度。而且消除了虚假的溢出。 - Will Ness
1
@WillNess 我该如何延迟向map中添加元素?难道我还需要另一个队列吗? - gradbot
1
@WillNess 添加了一个具有平方优化的递归版本。 - gradbot
这是我所想的 - 对你最初的代码进行简单的调整,沿用了上面链接的Haskell Map版本的思路。(repl.it使用F# v4,不需要任何类型注释,而旧版v3 Ideone则要求我添加注释)。 - Will Ness
显示剩余17条评论

10

尽管已经有一个答案使用优先队列(PQ)作为SkewBinomialHeap中的算法,但它可能不是适合此工作的正确PQ。增量埃拉托斯特尼筛(iEoS)需要一个PQ,在获取最小值和重新插入值方面具有出色性能,但不需要在添加新值方面达到最佳性能,因为iSoE仅将素数总数添加为新值直到范围的平方根(这是每个缩减操作发生的重新插入数量的一小部分)。 SkewBinomialHeap PQ实际上并没有比使用内置Map更多地提供平衡二叉搜索树 - 所有O(log n)操作 - 除了它稍微改变了操作的加权,以符合SoE的要求。 然而,SkewBinaryHeap仍然需要许多每次缩减的O(log n)操作。

一个PQ的实现可以作为,更具体地说是二叉堆,甚至更具体地说是最小堆,基本上满足iSoE的要求,在获取最小值时具有O(1)的性能,在重新插入和添加新条目时具有O(log n)的性能,尽管实际性能只是O(log n)的一小部分,因为大多数重新插入发生在队列顶部附近,而大多数添加新值(这些不重要,因为它们很少发生)发生在队列末尾,这些操作是最有效的。此外,最小堆PQ可以在一次(通常是一小部分)O(log n)遍历中高效地实现删除最小值和插入函数。然后,与Map(实现为AVL树)不同,其中有一个O(log n)操作,通常由于我们需要的最小值位于树的最左侧最后一个叶子,而在MinHeap PQ中,我们通常在根处添加和删除最小值,并在一次遍历中平均向下几个级别插入。因此,MinHeap PQ可以每个筛选减少仅使用一小部分的O(log n)操作,而不是多个较大部分的O(log n)操作。

MinHeap PQ可以使用纯函数式代码实现(没有实现“removeMin”,因为iSoE不需要,但有一个“adjust”函数可用于分段),如下所示:
[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  [<CompilationRepresentation(CompilationRepresentationFlags.UseNullAsTrueValue)>]
  [<NoEquality; NoComparison>]
  type MinHeapTree<'T> = 
      | HeapEmpty 
      | HeapOne of MinHeapTreeEntry<'T>
      | HeapNode of MinHeapTreeEntry<'T> * MinHeapTree<'T> * MinHeapTree<'T> * uint32

  let empty = HeapEmpty

  let getMin pq = match pq with | HeapOne(kv) | HeapNode(kv,_,_,_) -> Some kv | _ -> None

  let insert k v pq =
    let kv = MinHeapTreeEntry(k,v)
    let rec insert' kv msk pq =
      match pq with
        | HeapEmpty -> HeapOne kv
        | HeapOne kv2 -> if k < kv2.k then HeapNode(kv,pq,HeapEmpty,2u)
                          else let nn = HeapOne kv in HeapNode(kv2,nn,HeapEmpty,2u)
        | HeapNode(kv2,l,r,cnt) ->
          let nc = cnt + 1u
          let nmsk = if msk <> 0u then msk <<< 1
                     else let s = int32 (System.Math.Log (float nc) / System.Math.Log(2.0))
                          (nc <<< (32 - s)) ||| 1u //never ever zero again with the or'ed 1
          if k <= kv2.k then if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv,insert' kv2 nmsk l,r,nc)
                                                            else HeapNode(kv,l,insert' kv2 nmsk r,nc)
          else if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv2,insert' kv nmsk l,r,nc)
                else HeapNode(kv2,l,insert' kv nmsk r,nc)
    insert' kv 0u pq

  let private reheapify kv k pq =
    let rec reheapify' pq =
      match pq with
        | HeapEmpty -> HeapEmpty //should never be taken
        | HeapOne kvn -> HeapOne kv
        | HeapNode(kvn,l,r,cnt) ->
            match r with
              | HeapOne kvr when k > kvr.k ->
                  match l with //never HeapEmpty
                    | HeapOne kvl when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,HeapOne kv,r,cnt)
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,HeapOne kv,cnt) //only right qualifies
              | HeapNode(kvr,_,_,_) when k > kvr.k -> //need adjusting for left leaf or else left leaf
                  match l with //never HeapEmpty or HeapOne
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,reheapify' r,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,reheapify' r,cnt) //only right qualifies
              | _ -> match l with //r could be HeapEmpty but l never HeapEmpty
                        | HeapOne(kvl) when k > kvl.k -> HeapNode(kvl,HeapOne kv,r,cnt)
                        | HeapNode(kvl,_,_,_) when k > kvl.k -> HeapNode(kvl,reheapify' l,r,cnt)
                        | _ -> HeapNode(kv,l,r,cnt) //just replace the contents of pq node with sub leaves the same
    reheapify' pq


  let reinsertMinAs k v pq =
    let kv = MinHeapTreeEntry(k,v)
    reheapify kv k pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then rebuild by reheapify
    let rec adjust' pq =
      match pq with
        | HeapEmpty -> pq
        | HeapOne kv -> HeapOne(MinHeapTreeEntry(f kv.k kv.v))
        | HeapNode (kv,l,r,cnt) -> let nkv = MinHeapTreeEntry(f kv.k kv.v)
                                   reheapify nkv nkv.k (HeapNode(kv,adjust' l,adjust' r,cnt))
    adjust' pq

使用上述模块,可以采用轮式分解优化和高效的共归流(CIS)编写iSoE,具体如下:
type CIS<'T> = class val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i + 1 else 0 in let advcnd c i = c + uint32 WHLPTRN.[i]
  let inline culladv c p i = let n = c + uint32 WHLPTRN.[i] * p in if n < c then 0xFFFFFFFFu else n
  let rec mkprm (n,wi,pq,(bps:CIS<_>),q) =
    let nxt = advcnd n wi in let nxti = whladv wi
    if nxt < n then (0u,0,(0xFFFFFFFFu,0,MinHeap.empty,bps,q))
    elif n>=q then let bp,bpi = bps.v in let nc,nci = culladv n bp bpi,whladv bpi
                    let nsd = bps.cont() in let np,_ = nsd.v in let sqr = if np>65535u then 0xFFFFFFFFu else np*np
                    mkprm (nxt,nxti,(MinHeap.insert nc (cullstate(bp,nci)) pq),nsd,sqr)
    else match MinHeap.getMin pq with | None -> (n,wi,(nxt,nxti,pq,bps,q))
                                      | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                                                   if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   else (n,wi,(nxt,nxti,pq,bps,q))
  let rec pCID p pi pq bps q = CIS((p,pi),fun()->let (np,npi,(nxt,nxti,npq,nbps,nq))=mkprm (advcnd p pi,whladv pi,pq,bps,q)
                                                 pCID np npi npq nbps nq)
  let rec baseprimes() = CIS((FSTPRM,0),fun()->let np=FSTPRM+uint32 WHLPTRN.[0]
                                               pCID np (whladv 0) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let genseq sd = Seq.unfold (fun (p,pi,pcc) ->if p=0u then None else Some(p,mkprm pcc)) sd
  seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,MinHeap.empty,baseprimes(),(FSTPRM*FSTPRM)) |> genseq }

上述代码在一个i7-2700K (3.5GHz)作为64位代码下,约在0.077秒内计算出前100,000个质数,在0.977秒内计算出前1,000,000个质数,在大约14.33秒内计算出前10,000,000个质数,在大约221.87秒内计算出前100,000,000个质数。这段纯函数式代码略微比Dustin Cambell的基于可变字典的代码更快,添加了常见的优化技巧如筛法分解、延迟加入基础质数和使用更高效的CID( tryfsharpideone),但仍然是纯函数式代码,而他使用字典类的代码则不是。然而,对于大约两亿(约100 million个质数)的更大质数范围,使用基于哈希表的字典的代码将更快,因为字典操作没有O(log n)因素,这种收益超过了使用字典哈希表的计算复杂度。
上述程序具有进一步的特性,即分解轮被参数化,例如,可以通过将WHLPRMS设置为[|2u; 3u; 5u; 7u; 11u; 13u; 17u; 19u|]并将FSTPRM设置为23u来使用一个非常大的轮子,以获得大范围内约三分之二的运行时间,对于1000万个质数大约需要9.34秒,尽管请注意,在程序开始运行之前计算WHLPTRN需要几秒钟的时间,这是一个恒定的开销,无论质数范围如何。
比较分析:与纯函数式增量树折叠实现相比,该算法只稍微快一些,因为MinHeap树的平均使用高度比折叠树的深度少一倍,但由于它基于需要处理每个堆级别的右和左叶子以及任意一种方式的分支的二叉堆,而不是每个级别的树折叠中的单个比较,因此在遍历PQ树级别方面的效率损失等效于相应的常数因子损失。与其他基于PQ和Map的函数算法相比,改进通常是通过减少遍历各自树结构每个级别的O(log n)操作的常数因子来实现的。

小根堆通常是以可变数组二叉堆形式实现的,这种形式基于追溯树模型,是由Michael Eytzinger发明的,已有400多年历史。虽然问题中说没有对非函数式可变代码感兴趣,但如果必须避免使用所有使用可变性的子代码,则无法使用列表或懒惰列表,因为它们在性能方面在"覆盖下"使用可变性。因此,假设以下可变版本的MinHeap PQ由库提供,并享受更大素数范围的另一个超过两倍的性能因素:

[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  type MinHeapTree<'T> = ResizeArray<MinHeapTreeEntry<'T>>

  let empty<'T> = MinHeapTree<MinHeapTreeEntry<'T>>()

  let getMin (pq:MinHeapTree<_>) = if pq.Count > 0 then Some pq.[0] else None

  let insert k v (pq:MinHeapTree<_>) =
    if pq.Count = 0 then pq.Add(MinHeapTreeEntry(0xFFFFFFFFu,v)) //add an extra entry so there's always a right max node
    let mutable nxtlvl = pq.Count in let mutable lvl = nxtlvl <<< 1 //1 past index of value added times 2
    pq.Add(pq.[nxtlvl - 1]) //copy bottom entry then do bubble up while less than next level up
    while ((lvl <- lvl >>> 1); nxtlvl <- nxtlvl >>> 1; nxtlvl <> 0) do
      let t = pq.[nxtlvl - 1] in if t.k > k then pq.[lvl - 1] <- t else lvl <- lvl <<< 1; nxtlvl <- 0 //causes loop break
    pq.[lvl - 1] <-  MinHeapTreeEntry(k,v); pq

  let reinsertMinAs k v (pq:MinHeapTree<_>) = //do minify down for value to insert
    let mutable nxtlvl = 1 in let mutable lvl = nxtlvl in let cnt = pq.Count
    while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
      let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
      let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
      if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
    pq.[lvl - 1] <- MinHeapTreeEntry(k,v); pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then re-heapify
    if pq <> null then 
      let cnt = pq.Count
      if cnt > 1 then
        for i = 0 to cnt - 2 do //change contents using function
          let e = pq.[i] in let k,v = e.k,e.v in pq.[i] <- MinHeapTreeEntry (f k v)
        for i = cnt/2 downto 1 do //rebuild by reheapify
          let kv = pq.[i - 1] in let k = kv.k
          let mutable nxtlvl = i in let mutable lvl = nxtlvl
          while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
            let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
            let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
            if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
          pq.[lvl - 1] <- kv
    pq

极客提示:实际上,我原本期望可变版本会提供更好的性能比率,但由于嵌套的if-then-else代码结构和素数筛值的随机行为导致重新插入时出现了问题,这意味着CPU分支预测在大部分分支中失败,每个筛选减少需要重建指令预取缓存的CPU时钟周期增加了许多10倍。

这个算法中唯一固定的性能增益因素是分割和使用多任务处理,可以实现与 CPU 核心数量成比例的性能提升;然而,就目前而言,这是迄今为止最快的纯函数 SoE 算法,即使使用函数式 MinHeap 的纯函数形式也能击败简单的命令式实现,如 Jon Harrop's codeJohan Kullbom's Sieve of Atkin(后者在计时时出现错误,因为他只计算了 前 1000 万个质数而不是第 1000 万个质数),但如果使用更好的优化,那些算法将快约五倍。当我们添加更大的轮子分解的多线程时,函数式代码和命令式代码之间的比率约为五,因为命令式代码的计算复杂度增加得比函数式代码更快,而多线程有助于较慢的函数式代码,而不是较快的命令式代码,因为后者越来越接近枚举找到的质数所需的基本时间限制。 EDIT_ADD: 即使一个人可以选择继续使用MinHeap的纯函数版本,为了准备多线程的高效分割,稍微“破坏”了函数式代码的“纯度”,具体如下:1)传输复合筛选素数的表示最有效的方法是大小为段的打包位数组,2)虽然数组的大小已知,但使用数组理解以函数式方式初始化它并不高效,因为它在内部使用“ResizeArray”,需要每次添加x(我认为'x'对于当前实现来说是八)就需要复制自身,并且使用Array.init无法工作,因为许多特定索引处的值被跳过,3)因此,填充筛选的复合数组的最简单方法是将其零创建到正确的大小,然后运行初始化函数,该函数最多只能向每个可变数组索引写入一次。虽然这不是严格的“函数式”,但它接近于数组被初始化,然后再也没有修改过。

添加分割、多线程、可编程轮阶乘周长和许多性能调整的代码如下(除了一些新增的常量外,用于实现分割和多线程的额外调整代码大约占代码底部的一半,从“prmspg”函数开始):

type prmsCIS = class val pg:uint16 val bg:uint16 val pi:int val cont:unit->prmsCIS
                     new(pg,bg,pi,nxtprmf) = { pg=pg;bg=bg;pi=pi;cont=nxtprmf } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQOWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u;11u;13u;17u |] in let FSTPRM = 19u in let WHLCRC = int(WHLPRMS |> Seq.fold (*) 1u)
  let MXSTP = uint64(FSTPRM-1u) in let BFSZ = 1<<<11 in let NUMPRCS = System.Environment.ProcessorCount
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1 in let WHLPTRN = Array.zeroCreate (WHLLMT+1)
  let WHLRNDUP = let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1)
                                                          else acc in let b = a |> Array.scan (+) 0
                                      Array.init (WHLCRC>>>1) (fun i->
                                        if a.[i]=0 then 0 else let g=2*gap (i+1) 1 in WHLPTRN.[b.[i]]<-byte g;1)
                 Array.init WHLCRC (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u) then 1 else 0)
                 |> gaps |> Array.scan (+) 0
  let WHLPOS = WHLPTRN |> Array.map (uint32) |> Array.scan (+) 0u in let advcnd cnd cndi = cnd + uint32 WHLPTRN.[cndi]
  let MINRNGSTP = if WHLLMT<=31 then uint32(32/(WHLLMT+1)*WHLCRC) else if WHLLMT=47 then uint32 WHLCRC<<<1 else uint32 WHLCRC
  let MINBFRNG = uint32((BFSZ<<<3)/(WHLLMT+1)*WHLCRC)/MINRNGSTP*MINRNGSTP
  let MINBFRNG = if MINBFRNG=0u then MINRNGSTP else MINBFRNG
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline culladv c p i = c+uint32 WHLPTRN.[i]*p
  let rec mkprm (n,wi,pq,(bps:prmsCIS),q,lstp,bgap) =
    let nxt,nxti = advcnd n wi,whladv wi
    if n>=q then let p = (uint32 bps.bg<<<16)+uint32 bps.pg
                 let nbps,nxtcmpst,npi = bps.cont(),culladv n p bps.pi,whladv bps.pi
                 let pg = uint32 nbps.pg in let np = p+pg in let sqr = q+pg*((p<<<1)+pg) //only works to p < about 13 million
                 let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont) //therefore, algorithm only works to p^2 or about
                 mkprm (nxt,nxti,(MinHeap.insert nxtcmpst (cullstate(p,npi)) pq),nbps,sqr,lstp,(bgap+1us)) //1.7 * 10^14
    else match MinHeap.getMin pq with 
           | None -> (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us)) //fix with q is uint64
           | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                        if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,bgap)
                        elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,(bgap+1us))
                        else (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us))
  let rec pCIS p pg bg pi pq bps q = prmsCIS(pg,bg,pi,fun()->
    let (npg,nbg,npi,(nxt,nxti,npq,nbps,nq,nl,ng))=mkprm (p+uint32 WHLPTRN.[pi],whladv pi,pq,bps,q,p,0us)
    pCIS (p+uint32 npg) npg nbg npi npq nbps nq)
  let rec baseprimes() = prmsCIS(uint16 FSTPRM,0us,0,fun()->
                           let np,npi=advcnd FSTPRM 0,whladv 0
                           pCIS np (uint16 WHLPTRN.[0]) 1us npi MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let prmspg nxt adj pq bp q =
    //compute next buffer size rounded up to next even wheel circle so at least one each base prime hits the page
    let rng = max (((uint32(MXSTP+uint64(sqrt (float (MXSTP*(MXSTP+4UL*nxt))))+1UL)>>>1)+MINRNGSTP)/MINRNGSTP*MINRNGSTP) MINBFRNG
    let nxtp() = async {
      let rec addprms pqx (bpx:prmsCIS) qx = 
        if qx>=adj then pqx,bpx,qx //add primes to queue for new lower limit
        else let p = (uint32 bpx.bg<<<16)+uint32 bpx.pg in let nbps = bpx.cont()
             let pg = uint32 nbps.pg in let np = p+pg in let sqr = qx+pg*((p<<<1)+pg)
             let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont)
             addprms (MinHeap.insert qx (cullstate(p,bpx.pi)) pqx) nbps sqr
      let adjcinpg low k (v:cullstate) = //adjust the cull states for the new page low value
        let p = v.p in let WHLSPN = int64 WHLCRC*int64 p in let db = int64 p*int64 WHLPOS.[v.wi]
        let db = if k<low then let nk = int64(low-k)+db in nk-((nk/WHLSPN)*WHLSPN)
                 else let nk = int64(k-low) in if db<nk then db+WHLSPN-nk else db-nk
        let r = WHLRNDUP.[int((((db>>>1)%(WHLSPN>>>1))+int64 p-1L)/int64 p)] in let x = int64 WHLPOS.[r]*int64 p
        let r = if r>WHLLMT then 0 else r in let x = if x<db then x+WHLSPN-db else x-db in uint32 x,cullstate(p,r)
      let bfbtsz = int rng/WHLCRC*(WHLLMT+1) in let nbuf = Array.zeroCreate (bfbtsz>>>5)
      let rec nxtp' wi cnt = let _,nbg,_,ncnt = mkprm cnt in let nwi = wi + int nbg
                             if nwi < bfbtsz then nbuf.[nwi>>>5] <- nbuf.[nwi>>>5] ||| (1u<<<(nwi&&&0x1F)); nxtp' nwi ncnt
                             else let _,_,pq,bp,q,_,_ = ncnt in nbuf,pq,bp,q //results incl buf and cont parms for next page
      let npq,nbp,nq = addprms pq bp q
      return nxtp' 0 (0u,0,MinHeap.adjust (adjcinpg adj) npq,nbp,nq-adj,0u,0us) }
    rng,nxtp() |> Async.StartAsTask
  let nxtpg nxt (cont:(_*System.Threading.Tasks.Task<_>)[]) = //(len,pq,bp,q) =
    let adj = (cont |> Seq.fold (fun s (r,_)  -> s+r) 0u)
    let _,tsk = cont.[0] in let _,pq,bp,q = tsk.Result
    let ncont = Array.init (NUMPRCS+1) (fun i -> if i<NUMPRCS then cont.[i+1]
                                                 else prmspg (nxt+uint64 adj) adj pq bp q)
    let _,tsk = ncont.[0] in let nbuf,_,_,_ = tsk.Result in nbuf,ncont
  //init cond buf[0], no queue, frst bp sqr offset
  let initcond = 0u,System.Threading.Tasks.Task.Factory.StartNew (fun()->
                   (Array.empty,MinHeap.empty,baseprimes(),FSTPRM*FSTPRM-FSTPRM))
  let nxtcond n = prmspg (uint64 n) (n-FSTPRM) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM-FSTPRM)
  let initcont = Seq.unfold (fun (n,((r,_)as v))->Some(v,(n+r,nxtcond (n+r)))) (FSTPRM,initcond)
                 |> Seq.take (NUMPRCS+1) |> Seq.toArray
  let rec nxtprm (c,ci,i,buf:uint32[],cont) =
    let rec nxtprm' c ci i =
      let nc = c + uint64 WHLPTRN.[ci] in let nci = whladv ci in let ni = i + 1 in let nw = ni>>>5
      if nw >= buf.Length then let (npg,ncont)=nxtpg nc cont in nxtprm (c,ci,-1,npg,ncont)
      elif (buf.[nw] &&& (1u <<< (ni &&& 0x1F))) = 0u then nxtprm' nc nci ni
      else nc,nci,ni,buf,cont
    nxtprm' c ci i
  seq { yield! WHLPRMS |> Seq.map (uint64);
        yield! Seq.unfold (fun ((c,_,_,_,_) as cont)->Some(c,nxtprm cont))
                 (nxtprm (uint64 FSTPRM-uint64 WHLPTRN.[WHLLMT],WHLLMT,-1,Array.empty,initcont)) }

请注意,MinHeap模块,包括函数和基于数组的模块,已添加了“adjust”函数,以允许在每个新段页开始时调整每个线程版本PQ的剪枝状态。还要注意,可以调整代码,使大部分计算使用32位范围完成,最终序列输出为uint64,计算时间成本很小,因此当前的理论范围是100万亿(10的14次方),如果愿意等待大约三到四个月来计算该范围,则可能性非常小。数字范围检查已被删除,因为不太可能有人使用此算法来计算到达甚至超过该范围。
使用纯函数式MinHeap和2、3、5、7轮因数分解,上述程序在0.062、0.629、10.53和195.62秒内计算出了前十万、一百万、一千万和一亿个质数。使用基于数组的MinHeap可以将其加速到0.097、0.276、3.48和51.60秒。通过将WHLPRMS更改为“[| 2u;3u;5u;7u;11u;13u;17u |]”并将FSTPRM更改为19u,使用2、3、5、7、11、13、17轮可以进一步加速到0.181、0.308、2.49和36.58秒(对于具有恒定开销的恒定因子改进)。这种最快的调整方式在约88.37秒内计算出32位数字范围内的203,280,221个质数。可以通过权衡较小范围版本的较慢时间与较大范围的较快时间来调整“BFSZ”常量,建议尝试值为“1<<<14”以获得更大的范围。该常量仅设置最小缓冲区大小,程序会自动调整超过该大小的缓冲区大小,以使所需页面范围的最大基本质数至少“打击”每个页面一次;这意味着不需要额外的“桶筛”。这个最后完全优化的版本可以在大约256.8秒和3617.4秒内计算出高达10亿和1000亿的质数(对于输出,使用“primesPQOWSE() |> Seq.takeWhile ((>=)100000000000UL) |> Seq.fold (fun s p -> s + 1UL) 0UL”进行测试)。这就是关于计算出万亿个质数需要大约半天、计算出高达万亿个质数需要一周以及计算出高达万亿个质数需要三到四个月的估计。

我认为使用增量SoE算法编写功能性或几乎功能性的代码运行速度比这快的可能性不大。从代码中可以看出,优化基本的增量算法使代码复杂度显著增加,使得它可能稍微比基于直接数组裁剪的等效优化代码更加复杂,并且那种代码能够以约十倍于此的速度运行且没有额外的性能指数意味着这个功能性增量代码具有日益增长的额外百分比开销。

那么除了从理论和知识角度来看,这个有用吗?可能不会。对于小于约一千万的质数范围,基本的非完全优化递增式筛法可能足够且编写相当简单,或者比最简单的命令式筛法使用更少的RAM内存。但是,它们比使用数组的更命令式代码要慢得多,因此对于超过该范围的范围,它们会“失去效力”。虽然在这里已经证明了通过优化可以加快代码速度,但它仍然比更命令式的纯基于数组的版本慢10倍以上,同时增加了与具有等效优化的代码至少同等复杂度的复杂度,即使在DotNet上使用F#编写的代码也比直接编译为本机代码的C++等语言慢四倍;如果真的想要研究大量的质数范围,人们可能会使用其他语言和技术,在那里primesieve可以在不到四个小时的时间内计算出百万亿范围内的质数数量,而这个代码需要约三个月的时间。

你的代码在我的电脑上无法编译。 字段、构造函数或成员“pi”未定义(使用外部 F# 编译器) - http://share.linqpad.net/e6imko.linq - Maslow
@Maslow,刚刚修复了两个代码:顶部代码缺少“cullstate”类型,两个代码错误地引用了该结构类型中的“pi”而不是正确的“wi”字段。请原谅编码风格,因为这是我第一次使用F#时编写的;自那以后,我已经进入了更进一步的函数式编程语言,今天我不太可能写出相同的代码,但它仍然符合O'Neil参考文章中的Haskell增量埃拉托斯特尼筛法原则。 - GordonBGood
@Maslow 注意,纯函数式增量筛法无法与另一个我的答案中的数组突变页段筛法的速度相匹配。我用F#编写了一个最大化轮因子分解、多线程、页面分段、埃拉托色尼筛法,它在Intel Atom i5-Z8350处理器@1.44 GHz(4核)上,在不到300毫秒的时间内找到10亿个素数,比任何语言中的Atkin筛法实现都快得多,并且在JIT编译和数组边界检查减慢的情况下,与Kim Walisch的“primesieve”(用C++编写)相差约一倍。 - GordonBGood
我认为(根据你的发现,我猜想未来)函数式编程可以实现的并行性水平最终会胜出。同时感谢你的撰写。 - Maslow
@Maslow,我提到的快速实现是在Haskell中编写的,但在Haskell中,数组变异被锁定在ST Monad后面,而在F#中,人们必须使用自律来实现相同的结果。由于处理优先级队列的高开销和额外的“log n”乘数项访问它,非数组版本永远无法像这样快。像这样的筛子(或者更简单,因为不需要实现优先级队列的树折叠版本)只有在大约一百万的范围内才真正有用。不客气。 - GordonBGood

6
这是一个基本上最大程度优化的增量(和递归)基于映射的埃拉托色尼筛法,使用序列,因为没有必要记忆先前的序列值(除了使用Seq.cache缓存基础质数值会略有优势),其主要优化在于它使用轮子分解来进行输入序列,并且使用多个(递归)流来维护小于正在筛选的最新数字的平方根的基础质数。
  let primesMPWSE =
    let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                     4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
    let adv i = if i < 47 then i + 1 else 0
    let reinsert oldcmpst mp (prime,pi) =
      let cmpst = oldcmpst + whlptrn.[pi] * prime
      match Map.tryFind cmpst mp with
        | None -> mp |> Map.add cmpst [(prime,adv pi)]
        | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
    let rec mkprimes (n,i) m ps q =
      let nxt = n + whlptrn.[i]
      match Map.tryFind n m with
        | None -> if n < q then seq { yield (n,i); yield! mkprimes (nxt,adv i) m ps q }
                  else let (np,npi),nlst = Seq.head ps,ps |> Seq.skip 1
                       let (nhd,ni),nxtcmpst = Seq.head nlst,n + whlptrn.[npi] * np
                       mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nlst (nhd * nhd)
        | Some(skips) -> let adjmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                         mkprimes (nxt,adv i) adjmap ps q
    let rec prs = seq {yield (11,0); yield! mkprimes (13,1) Map.empty prs 121 } |> Seq.cache
    seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> Seq.map (fun (p,i) -> p) }

它能够在约0.445秒内找到从1到1,299,721的第100,000个质数,但它不是一个适当的命令式EoS算法,随着质数数量的增加,它的比例并不接近线性,需要7.775秒才能找到从1到15,485,867的第1,000,000个质数,其性能范围大约为O(n^1.2),其中n是找到的最大质数。

还有一些调整可以做,但这可能不会对性能提升产生很大的影响,具体如下:

  1. 由于 F# 序列库明显较慢,可以使用自定义类型实现 IEnumerable 来减少内部序列的时间消耗,但是由于序列操作仅占总时间的约 20%,即使这些操作被减少到零,结果也只能将时间缩短到 80%。

  2. 可以尝试其他形式的映射存储,例如 O'Neil 提到的优先队列或 @gradbot 使用的 SkewBinomialHeap,但至少对于 SkewBinomialHeap,性能提升仅有几个百分点。似乎在选择不同的映射实现时,人们只是在交换更好的响应以查找和删除靠近列表开头的项所花费的时间,以享受这些好处,因此净收益基本上是相当的,并且随着映射中条目数量的增加而具有 O(log n) 的性能。上述优化仅使用多个条目流,以平方根减少映射中的条目数,从而使这些改进变得不那么重要。

EDIT_ADD: 我确实进行了一点额外的优化,性能提高了比预期更多,这很可能是由于改进了消除 Seq.skip 作为通过基本质数序列前进的方式。这种优化使用内部序列生成的替代方法,生成一个整数值和一个继续函数的元组,用于推进序列中的下一个值,最终的 F# 序列由整体展开操作生成。代码如下:
type SeqDesc<'a> = SeqDesc of 'a * (unit -> SeqDesc<'a>) //a self referring tuple type
let primesMPWSE =
  let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                   4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
  let inline adv i = if i < 47 then i + 1 else 0
  let reinsert oldcmpst mp (prime,pi) =
    let cmpst = oldcmpst + whlptrn.[pi] * prime
    match Map.tryFind cmpst mp with
      | None -> mp |> Map.add cmpst [(prime,adv pi)]
      | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
  let rec mkprimes (n,i) m (SeqDesc((np,npi),nsdf) as psd) q =
    let nxt = n + whlptrn.[i]
    match Map.tryFind n m with
      | None -> if n < q then SeqDesc((n,i),fun() -> mkprimes (nxt,adv i) m psd q)
                else let (SeqDesc((nhd,x),ntl) as nsd),nxtcmpst = nsdf(),n + whlptrn.[npi] * np
                     mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nsd (nhd * nhd)
      | Some(skips) -> let adjdmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                       mkprimes (nxt,adv i) adjdmap psd q
  let rec prs = SeqDesc((11,0),fun() -> mkprimes (13,1) Map.empty prs 121 )
  let genseq sd = Seq.unfold (fun (SeqDesc((n,i),tailfunc)) -> Some(n,tailfunc())) sd
  seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> genseq }

寻找第100,000个和第1,000,000个质数所需的时间分别约为0.31秒和5.1秒,因此这种小改变带来了相当大的百分比收益。我尝试了自己实现IEnumerable/IEnumerator接口,它们是序列的基础,虽然它们比Seq模块使用的版本更快,但在这个算法中几乎不会产生进一步的差异,因为大部分时间都花费在Map函数上。END_EDIT_ADD 除了基于map的增量EoS实现外,还有另一种“纯函数”实现,使用树折叠被称为稍微更快,但由于树折叠中仍存在O(log n)项,我怀疑它主要更快(如果真的更快)是由于算法的实现方式与计算机操作数量相比使用map。如果有人感兴趣,我也会开发那个版本。
最终,人们必须接受这样一个事实:对于更大的数字范围,没有纯函数式的增量EoS实现可以接近良好命令式实现的原始处理速度。然而,可以想出一种方法,在该方法中所有代码都是纯函数式的,除了使用(可变)数组进行分段筛选合数的部分,其性能接近于O(n),在实际使用中,对于像前200,000,000个素数这样的大范围功能算法,它将比纯函数式算法快五十到一百倍。@Jon Harrop在他的博客中已经完成了这项工作,但是这可以通过非常少量的额外代码进一步调整。

@WillNess,续:在Ideone.com上,F#和Haskell之间的比率更糟糕,您的primesTMWE计算前100,000个质数大约需要0.13秒,而我的最新版本primesPMWSE需要大约1.2秒或者慢了近十倍!我认为这是因为Ideone服务器基于Linux并运行GHC Haskell,它非常高效,而他们正在运行mono-project F# 2版本,具有著名的差JIT编译和垃圾收集。在使用DotNet的Windows框上,比率可能只有三倍左右。 - GordonBGood
@Will Ness,我已将您的Haskell树合并代码转换为另一种答案,作为此问题的另一个答案,可在此处列出。即使我尽可能优化,它也不比基于map的版本快多少。 - GordonBGood
非常有趣。在Haskell中,基于map的延迟版本稍微慢一些,但与树版本具有相同的复杂度。只是在Haskell中,树版本的代码(不包括wheel部分)比map的代码更加简短清晰。 :) 它甚至允许使用终极单行(几乎)公式,2 : _Y ( (3:) . gaps 5 . unionAll . map (\p->[p*p, p*p+2*p..]) ) where _Y g = g (_Y g)(不包括gapsunionAll的定义)。 - Will Ness
@WillNess,是的,我真的很喜欢使用“非共享不动点组合器来实现望远镜式多阶段素数生成”的版本,尽管我仍在努力理解它的工作原理。我也可以在F#版本中实现它,尽管可能会更冗长一些。我仍然有使用模式匹配而不是结构体的版本,这样做虽然不太冗长,但仍不如Haskell简洁。 - GordonBGood
这里没有什么特别的,只是一个简单直接的递归 - 数据中没有自我引用(没有循环,比如这里),因此它会创建尽可能多的单独阶段,通过平方/平方根来增加/减少。这是Python版本,完全做同样的事情。 - Will Ness
显示剩余6条评论

5
这是我尝试将Haskell代码忠实地翻译成F#的结果:
#r "FSharp.PowerPack"

module Map =
  let insertWith f k v m =
    let v = if Map.containsKey k m then f m.[k] v else v
    Map.add k v m

let sieve =
  let rec sieve' map = function
  | LazyList.Nil -> Seq.empty
  | LazyList.Cons(x,xs) -> 
      if Map.containsKey x map then
        let facts = map.[x]
        let map = Map.remove x map
        let reinsert m p = Map.insertWith (@) (x+p) [p] m
        sieve' (List.fold reinsert map facts) xs
      else
        seq {
          yield x
          yield! sieve' (Map.add (x*x) [x] map) xs
        }
  fun s -> sieve' Map.empty (LazyList.ofSeq s)

let rec upFrom i =
  seq {
    yield i
    yield! upFrom (i+1)
  }

let primes = sieve (upFrom 2)

这实际上比我发布的算法要慢。对于筛选前 100 000 个自然数,我的算法大约需要8秒,而这个算法在我的机器上需要超过9秒。我实际上没有计时Haskell解决方案(甚至难以运行),但这似乎非常慢。可能是 LazyList 的实现方式有问题,它似乎使用锁来避免副作用? - IVlad
F# 优先队列的实现可以在以下问题中找到:https://dev59.com/K3A75IYBdhLWcg3wYH_I。 - Muhammad Alkarouri
好的,算了,我当时有点傻。实际上我让你的程序找到了第100000个质数 : )。我的程序仍需要8秒,而你的确实只需要0.5秒。+1。我知道优先队列的实现方式,但我想先理解这个。 - IVlad
http://stackoverflow.com/search?q=user%3A849891++postpone(以及[this](https://dev59.com/yG445IYBdhLWcg3w7uc0#pJbknYgBc1ULPQZFDP2C)):) -- @kvb,你们两个都是对的,但第6页上的代码是错误/不足的:是的,您可以/应该从p*p开始添加,但是*只有在p*p(而不是p!)被视为候选数字之后才能这样做! :) 那篇文章的作者已经在带有源代码的附带ZIP文件中进行了更正。 - Will Ness
@WillNess - 说得好,不过实现这个改变需要将素数队列与现有输入一起线程化通过函数。 - kvb
显示剩余6条评论

5

使用邮箱进程实现的素数筛:

let (<--) (mb : MailboxProcessor<'a>) (message : 'a) = mb.Post(message)
let (<-->) (mb : MailboxProcessor<'a>) (f : AsyncReplyChannel<'b> -> 'a) = mb.PostAndAsyncReply f

type 'a seqMsg =  
    | Next of AsyncReplyChannel<'a>   

type PrimeSieve() =   
    let counter(init) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop n =   
                async { let! msg = inbox.Receive()   
                        match msg with
                        | Next(reply) ->   
                            reply.Reply(n)   
                            return! loop(n + 1) }   
            loop init)   

    let filter(c : MailboxProcessor<'a seqMsg>, pred) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop() =   
                async {   
                    let! msg = inbox.Receive()   
                    match msg with
                    | Next(reply) ->
                        let rec filter prime =
                            if pred prime then async { return prime }
                            else async {
                                let! next = c <--> Next
                                return! filter next }
                        let! next = c <--> Next
                        let! prime = filter next
                        reply.Reply(prime)
                        return! loop()   
                }   
            loop()   
        )   

    let processor = MailboxProcessor.Start(fun inbox ->   
        let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =   
            async {   
                let! msg = inbox.Receive()   
                match msg with
                | Next(reply) ->   
                    reply.Reply(prime)   
                    let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))   
                    let! newPrime = oldFilter <--> Next
                    return! loop newFilter newPrime   
            }   
        loop (counter(3)) 2)   

    member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)

    static member upto max =
        let p = PrimeSieve()
        Seq.initInfinite (fun _ -> p.Next())
        |> Seq.takeWhile (fun prime -> prime <= max)
        |> Seq.toList

可能不是非常快,但它的绝妙之处在于它的惊人表现。 - Juliet

3

这是我的两分之一,虽然我不确定它是否符合OP对欧拉筛的真正标准。它没有使用模除,并实现了OP引用的论文中的优化。它只适用于有限列表,但在我看来,这符合欧拉筛最初的描述精神。顺便说一句,该论文讨论了标记数量和除法数量的复杂性。似乎我们必须遍历链接列表,这可能忽略了各种算法在性能方面的一些关键方面。总的来说,使用计算机进行模除是一项昂贵的操作。

open System

let rec sieve list =
    let rec helper list2 prime next =
        match list2 with
            | number::tail -> 
                if number< next then
                    number::helper tail prime next
                else
                    if number = next then 
                        helper tail prime (next+prime)
                    else
                        helper (number::tail) prime (next+prime)

            | []->[]
    match list with
        | head::tail->
            head::sieve (helper tail head (head*head))
        | []->[]

let step1=sieve [2..100]

编辑:我已经在原帖中修复了一个错误。我试图按照筛法的原始逻辑进行一些修改。也就是从第一个元素开始,将其倍数从集合中划掉。这个算法实际上是寻找下一个是素数的倍数,而不是对集合中的每个数字进行模运算。论文中的优化是从p^2开始寻找素数的倍数。

帮助函数中的多级部分处理了下一个素数的倍数可能已经被从列表中删除的可能性。例如,对于素数5,它将尝试删除数字30,但永远不会找到它,因为它已经被素数3删除了。希望这澄清了算法的逻辑。


sieve [1..10000]需要约2秒钟时间,而我的算法和@kvb的算法则是即时的。您能否简单解释一下算法背后的逻辑? - IVlad
+1,这似乎比以前快。但是,如果我尝试筛选[2..100000],我会得到一个堆栈溢出异常。 - IVlad
@IVlad 应该可以通过将 top_limit 作为 sieve 函数的另一个参数来实现大幅度加速,并使其测试是否 top_limit/head < head,如果是,则只需返回 head::tail。有关详细讨论(使用 Haskell),请参见此处。此外,使用 [3..2..100] 并将 (2*head) 作为步长值调用 helper 将有所帮助(尽管这将仅使速度加倍)。 :) 乾杯! - Will Ness

3

说实话,这不是 Erathothenes 筛法,但它非常快:For what its worth

let is_prime n =
    let maxFactor = int64(sqrt(float n))
    let rec loop testPrime tog =
        if testPrime > maxFactor then true
        elif n % testPrime = 0L then false
        else loop (testPrime + tog) (6L - tog)
    if n = 2L || n = 3L || n = 5L then true
    elif n <= 1L || n % 2L = 0L || n % 3L = 0L || n % 5L = 0L then false
    else loop 7L 4L
let primes =
    seq {
        yield 2L;
        yield 3L;
        yield 5L;
        yield! (7L, 4L) |> Seq.unfold (fun (p, tog) -> Some(p, (p + tog, 6L - tog)))
    }
    |> Seq.filter is_prime

在我的机器上(AMD Phenom II,3.2GHZ四核),它能够在1.25秒内找到第100,000个质数。


@JonHarrop 有趣的阅读。 :) (无法在那里评论...所以如果我可以,在这里评论...)您通过将顶部质数加倍来扩大数组,但您也可以将其平方。最优复杂度通常被视为_n_个质数产生的n^1.2。您的数据显示n^1.4。 - Will Ness
@Will,供参考,Jon的工作链接为:Mutable Arrays SoE。Jon的程序在几个方面都没有达到最大效率:1)它将所有找到的质数添加到输出列表中,而不仅仅是基本根质数;2)它为每个新质数使用新的堆筛选缓冲区,而不是一个固定大小的缓冲区,该缓冲区应限制在CPU L1缓存大小以避免标记复合物时出现缓存抖动;3)它不使用轮子甚至只使用奇数。对于大范围来说,当前实现并不那么快。 - GordonBGood
1
@Will,是的,我已经阅读了Silva的文章,但是通过足够的缓存,可以有效地使用EoS筛选大约10亿个数字,具体如下:假设CPU L1缓存大小为32 KBytes,可以使用轮子筛法表示大约一百万个数字范围,因此最大基本根质数平均至少有一个命中率,直到这个范围的三分之一左右,这意味着我们可以筛选出上述范围内的数字。超过这个范围,就必须使用桶筛法。然而,在那个约为10^10的限制范围内,性能接近O(nloglogn)。EoS在那个范围内也有其局限性。 - GordonBGood
@Will,为了完整起见,即使我们能够获得基于映射或树形折叠的纯函数筛选器版本,可以在一秒钟内计算出1600万以内的质数,并且即使性能在此范围内仍然是O(n^1.2),计算203,280,221个32位质数范围将需要大约13分钟,而使用分段缓冲区在多核桌面计算机上可能只需要约10秒。即使从那时起对于桶式筛法使用函数技术,我们也已经从基本质数的性能中获益。 - GordonBGood
@WillNess,关于使用可变数组作为数组中唯一的“非纯”项的概念的进一步澄清,请参阅我最近对此问题的新回答。我的最新答案比这里讨论的John Harrop的代码更进一步,并解决了可能存在的过度内存使用的主要反对意见,同时实际上提高了性能。 - GordonBGood
显示剩余6条评论

3

我知道你明确表示你对纯函数筛实现很感兴趣,所以我一直没有展示我的筛法。但是重新阅读你提到的文件后,我发现那里介绍的增量筛算法与我的算法基本相同,唯一的区别在于使用纯功能技术和明确的命令式技术的实现细节。因此,我认为我至少可以满足你的好奇心。此外,当在功能界面下隐藏了明显的性能收益时,使用命令式技术是F#编程中鼓励使用的最强有力的技术之一,而不是Haskell文化中的“纯洁万能论”。我最初在我的Project Euler for F#un blog上发布了这个实现,但是现在重新发布时用先决条件代码替换掉,并删除结构类型。在我的计算机上,primes可以在0.248秒内计算出前100,000个质数,在4.8秒内计算出前1,000,000个质数(请注意,primes缓存其结果,因此每次执行基准测试时都需要重新评估它)。

let inline infiniteRange start skip = 
    seq {
        let n = ref start
        while true do
            yield n.contents
            n.contents <- n.contents + skip
    }

///p is "prime", s=p*p, c is "multiplier", m=c*p
type SievePrime<'a> = {mutable c:'a ; p:'a ; mutable m:'a ; s:'a}

///A cached, infinite sequence of primes
let primes =
    let primeList = ResizeArray<_>()
    primeList.Add({c=3 ; p=3 ; m=9 ; s=9})

    //test whether n is composite, if not add it to the primeList and return false
    let isComposite n = 
        let rec loop i = 
            let sp = primeList.[i]
            while sp.m < n do
                sp.c <- sp.c+1
                sp.m <- sp.c*sp.p

            if sp.m = n then true
            elif i = (primeList.Count-1) || sp.s > n then
                primeList.Add({c=n ; p=n ; m=n*n ; s=n*n})
                false
            else loop (i+1)
        loop 0

    seq { 
        yield 2 ; yield 3

        //yield the cached results
        for i in 1..primeList.Count-1 do
            yield primeList.[i].p

        yield! infiniteRange (primeList.[primeList.Count-1].p + 2) 2 
               |> Seq.filter (isComposite>>not)
    }

1
当然,你是正确的,没有充分的理由使用纯函数式方法来实现筛法,这只是我的好奇心。命令式筛法支持更多的优化,例如减半内存使用(您不关心二的倍数),使用单个位来标记合成数(与使用映射相比)等等。而且它将保持在O(n log log n),而纯函数式实现则不会。+1 对于一些有趣的代码。 - IVlad
@Stephen,针对IVlad的评论,他要求纯函数式代码而没有可变状态,你的代码对性能来说“可变”的理由并不好,因为这个算法与Jon Harrop的类似(http://fsharpnews.blogspot.ca/2010/02/sieve-of-eratosthenes.html),除了他使用可变数组之外,你通过使用包含可变记录的可变列表(ResizeArray)放弃了所有效率,并且你通过重复索引(一个O(index)操作)串行处理该列表,其性能几乎不比纯函数式代码更好。 - GordonBGood
@GordonBGood ResizeArray 的索引操作是 O(1) 的(它们在底层由数组支持)... - Stephen Swensen
@Stephen - 是的,List<'T>类是由数组支持的,所以索引不是问题,但是向列表中添加项目的比率具有O[n]项(取决于溢出时容量增加的情况)。当人们意识到该算法在每个复合检查中都要扫描所有基本平方根质数时,大约10百万的大范围内O(n^1.4)的性能就很容易解释了。实际上,这并不是真正的SoE,因为它通过乘法计算下一个复合数,而不是通过添加素数来计算,但这很容易修复(sp.m <- sp.p + sp.m,不需要sp.c)。 - GordonBGood
@Stephen,当你筛选奇素数时,你可以在while循环中两次添加sp.p,如sp.m <- sp.m + sp.p + sp.p(不需要sp.c),尽管这种更改的唯一区别是减少了该while循环中的循环次数。此外,将基本素数计算与主要素数输出生成器分开意味着只需要将基本素数存储在备忘录中,而无需将主要素数添加到列表中,因此您可以大大减少计算时间和内存要求,尽管性能仍然为O(n ^ 1.4)。 - GordonBGood

3
这是使用纯函数式F#代码实现增量埃拉托色尼筛法(SoE)的另一种方法。它改编自Haskell代码,该代码由Dave Bayer开发,但他使用了复杂的公式和平衡三进制树结构,并逐步以统一方式加深(Heinrich Apfelmus引入了简化的公式和倾斜向右的加深二进制树结构,Will Ness进一步简化)。生产阶段的想法归功于M. O'Neill,链接如下:Optimized Tree Folding code using a factorial wheel in Haskell
以下代码有几个优化,使其更适合在F#中执行:
  1. The code uses coinductive streams instead of LazyList's as this algorithm has no (or little) need of LazyList's memoization and my coinductive streams are more efficient than either LazyLists (from the FSharp.PowerPack) or the built in sequences. A further advantage is that my code can be run on tryFSharp.org and ideone.com without having to copy and paste in the Microsoft.FSharp.PowerPack Core source code for the LazyList type and module (along with copyright notice)

  2. It was discovered that there is quite a lot of overhead for F#'s pattern matching on function parameters so the previous more readable discriminated union type using tuples was sacrificed in favour of by-value struct (or class as runs faster on some platforms) types for about a factor of two or more speed up.

  3. Will Ness's optimizations going from linear tree folding to bilateral folding to multi-way folding and improvements using the wheel factorization are about as effective ratiometrically for F# as they are for Haskell, with the main difference between the two languages being that Haskell can be compiled to native code and has a more highly optimized compiler whereas F# has more overhead running under the DotNet Framework system.

    type prmstate = struct val p:uint32 val pi:byte new (prm,pndx) = { p = prm; pi = pndx } end
    type prmsSeqDesc = struct val v:prmstate val cont:unit->prmsSeqDesc new(ps,np) = { v = ps; cont = np } end
    type cmpststate = struct val cv:uint32 val ci:byte val cp:uint32 new (strt,ndx,prm) = {cv = strt;ci = ndx;cp = prm} end
    type cmpstsSeqDesc = struct val v:cmpststate val cont:unit->cmpstsSeqDesc new (cs,nc) = { v = cs; cont = nc } end
    type allcmpsts = struct val v:cmpstsSeqDesc val cont:unit->allcmpsts new (csd,ncsdf) = { v=csd;cont=ncsdf } end
    
    let primesTFWSE =
      let whlptrn = [| 2uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;6uy;6uy;2uy;6uy;4uy;2uy;6uy;4uy;6uy;8uy;4uy;2uy;4uy;2uy;
                       4uy;8uy;6uy;4uy;6uy;2uy;4uy;6uy;2uy;6uy;6uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;2uy;10uy;2uy;10uy |]
      let inline whladv i = if i < 47uy then i + 1uy else 0uy
      let inline advmltpl c ci p = cmpststate (c + uint32 whlptrn.[int ci] * p,whladv ci,p)
      let rec pmltpls cs = cmpstsSeqDesc(cs,fun() -> pmltpls (advmltpl cs.cv cs.ci cs.cp))
      let rec allmltpls (psd:prmsSeqDesc) =
        allcmpsts(pmltpls (cmpststate(psd.v.p*psd.v.p,psd.v.pi,psd.v.p)),fun() -> allmltpls (psd.cont()))
      let rec (^) (xs:cmpstsSeqDesc) (ys:cmpstsSeqDesc) = //union op for SeqDesc's
        match compare xs.v.cv ys.v.cv with
          | -1 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys)
          | 0 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys.cont())
          | _ -> cmpstsSeqDesc(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
      let rec pairs (csdsd:allcmpsts) =
        let ys = csdsd.cont in
        allcmpsts(cmpstsSeqDesc(csdsd.v.v,fun()->csdsd.v.cont()^ys().v),fun()->pairs (ys().cont()))
      let rec joinT3 (csdsd:allcmpsts) = cmpstsSeqDesc(csdsd.v.v,fun()->
        let ys = csdsd.cont() in let zs = ys.cont() in (csdsd.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
      let rec mkprimes (ps:prmstate) (csd:cmpstsSeqDesc) =
        let nxt = ps.p + uint32 whlptrn.[int ps.pi]
        if ps.p >= csd.v.cv then mkprimes (prmstate(nxt,whladv ps.pi)) (csd.cont()) //minus function
        else prmsSeqDesc(prmstate(ps.p,ps.pi),fun() -> mkprimes (prmstate(nxt,whladv ps.pi)) csd)
      let rec baseprimes = prmsSeqDesc(prmstate(11u,0uy),fun() -> mkprimes (prmstate(13u,1uy)) initcmpsts)
      and initcmpsts = joinT3 (allmltpls baseprimes)
      let genseq sd = Seq.unfold (fun (psd:prmsSeqDesc) -> Some(psd.v.p,psd.cont())) sd
      seq { yield 2u; yield 3u; yield 5u; yield 7u; yield! mkprimes (prmstate(11u,0uy)) initcmpsts |> genseq }
    
    primesLMWSE |> Seq.nth 100000
    
EDIT_ADD: 该程序已经修正,原始代码未正确处理流的尾部,并将参数流的尾部传递给pairs函数到joinT3函数,而不是zs流后面的尾部。下面的时间已经相应地进行了更正,速度提高了约30%。tryFSharp和ideone链接代码也已经更正。END_EDIT_ADD

以上程序在n为计算的最大质数时以O(n^1.1)的性能运行,或者当n为计算的质数数量时以O(n^1.18)的性能运行,并且在快速计算机上使用结构体类型而不是类(似乎某些实现在形成闭包时会对按值结构体进行装箱和拆箱)计算前一百万个质数大约需要2.16秒(前十万个质数大约需要0.14秒)。我认为这是任何这些纯函数质数算法的最大实际范围。除了对减少常量因子进行一些微小的调整之外,这可能是运行纯函数SoE算法的最快方法。

除了将分段和多线程结合起来共享多个CPU核心的计算之外,可以对该算法进行的“调整”大多是增加轮式分解的周长,以获得高达约40%的性能提升,并由于使用结构、类、元组或更直接的单个参数在函数之间传递状态而带来的小的收益。 EDIT_ADD2:我已经完成了上述优化,结果代码现在几乎快了一倍,由于结构优化的额外奖励是可选使用更大的轮式分解周长以获得更小的降低。请注意,下面的代码避免在主序列生成循环中使用连续体,仅在必要时使用它们用于基本质数流和从这些基本质数导出的后续复合数筛选流。新代码如下:
type CIS<'T> = struct val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end //Co-Inductive Steam
let primesTFOWSE =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline advcnd c ci = c + uint32 WHLPTRN.[ci]
  let inline advmltpl p (c,ci) = (c + uint32 WHLPTRN.[ci] * p,whladv ci)
  let rec pmltpls p cs = CIS(cs,fun() -> pmltpls p (advmltpl p cs))
  let rec allmltpls k wi (ps:CIS<_>) =
    let nxt = advcnd k wi in let nxti = whladv wi
    if k < ps.v then allmltpls nxt nxti ps
    else CIS(pmltpls ps.v (ps.v*ps.v,wi),fun() -> allmltpls nxt nxti (ps.cont()))
  let rec (^) (xs:CIS<uint32*_>) (ys:CIS<uint32*_>) = 
    match compare (fst xs.v) (fst ys.v) with //union op for composite CIS's (tuple of cmpst and wheel ndx)
      | -1 -> CIS(xs.v,fun() -> xs.cont() ^ ys)
      | 0 -> CIS(xs.v,fun() -> xs.cont() ^ ys.cont())
      | _ -> CIS(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
  let rec pairs (xs:CIS<CIS<_>>) =
    let ys = xs.cont() in CIS(CIS(xs.v.v,fun()->xs.v.cont()^ys.v),fun()->pairs (ys.cont()))
  let rec joinT3 (xs:CIS<CIS<_>>) = CIS(xs.v.v,fun()->
    let ys = xs.cont() in let zs = ys.cont() in (xs.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
  let rec mkprm (cnd,cndi,(csd:CIS<uint32*_>)) =
    let nxt = advcnd cnd cndi in let nxti = whladv cndi
    if cnd >= fst csd.v then mkprm (nxt,nxti,csd.cont()) //minus function
    else (cnd,cndi,(nxt,nxti,csd))
  let rec pCIS p pi cont = CIS(p,fun()->let (np,npi,ncont)=mkprm cont in pCIS np npi ncont)
  let rec baseprimes() = CIS(FSTPRM,fun()->let np,npi = advcnd FSTPRM 0,whladv 0
                                           pCIS np npi (advcnd np npi,whladv npi,initcmpsts))
  and initcmpsts = joinT3 (allmltpls FSTPRM 0 (baseprimes()))
  let inline genseq sd = Seq.unfold (fun (p,pi,cont) -> Some(p,mkprm cont)) sd
  seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,initcmpsts) |> genseq }

上述代码在参考Intel i7-2700K(3.5 GHz)64位模式机器上,枚举前十万、一百万和一千万个质数分别需要约0.07、1.02和14.58秒。这比该代码衍生自的参考Haskell实现稍慢,尽管在tryfsharpideone上略慢,因为在Silverlight下tryfsharp处于32位模式(大约慢了一半),在ideone上运行在较慢的Mono 2.0上(对于F#来说本质上要慢得多),所以比参考机器慢高达五倍。请注意,ideone报告的运行时间包括嵌入式查找表数组的初始化时间,该时间需要计算在内。
上述程序还有一个特点,即因子分解轮被参数化,例如,可以通过将WHLPRMS设置为[| 2u;3u;5u;7u;11u;13u;17u;19u |]和FSTPRM设置为23u来使用极大的轮,以在大范围内获得三分之二的运行时间,在一千万个质数时大约需要10.02秒,但请注意,在程序开始运行之前需要几秒钟来计算WHLPRMS。 < p >极客笔记:我没有按照参考的Haskell代码实现“用于望远镜多级素数生成的非共享不动点组合子”,尽管我试图这样做,因为需要像Haskell的延迟列表这样的东西才能使其在不进入无限循环和堆栈溢出的情况下工作。虽然我的协同流(CIS)具有某些惰性的属性,但它们并不是正式的惰性列表或缓存序列(当传递给诸如我提供的“genseq”一类的最终输出序列的函数时,它们会变成非缓存序列,并且可以被缓存)。我不想使用LazyList的PowerPack实现,因为它不太高效,并且需要将该源代码复制到tryfsharp和ideone中,而这些平台不支持导入模块。当想要使用头/尾操作时,使用内置序列(即使是缓存的)非常低效,因为获取序列的尾部的唯一方法是使用“Seq.skip 1”,这将基于原始序列递归跳过许多次生成新的序列。我可以基于CIS实现自己的高效LazyList类,但是为了证明一个观点而这样做似乎并不值得,因为递归的“initcmpsts”和“baseprimes”对象需要很少的代码。此外,将LazyList传递给函数以产生对该LazyList的扩展,该函数仅使用接近Lazylist开头的值,这要求几乎整个LazyList被存储以减少内存效率:对前1000万个质数的一次遍历将需要在内存中具有近1.8亿个元素的LazyList。所以我放弃了这个方法。

请注意,对于更大的范围(1000万个质数或更多),这种纯函数式代码与许多简单命令式实现的埃拉托斯特尼筛法或阿特金斯筛法的速度大致相同,尽管这是由于这些命令式算法缺乏优化;一个使用等效优化和分段筛选数组的比这个更加命令式的实现仍然比这个快大约十倍,如我的“几乎函数式”的答案所示。
此外,请注意,虽然可以使用树折叠实现分段筛选,但这更加困难,因为剔除提前算法被嵌入到用于“union - ^”运算符的连续体中,并且解决这个问题意味着需要使用不断推进的剔除范围;这与其他算法不同,其中剔除变量的状态可以为每个新页面重置,包括其范围的缩小,因此,如果使用比32位更大的范围,则即使在确定了64位素数范围时,内部剔除范围仍然可以在执行时间每个素数的成本很小的情况下重置为在32位范围内运行。

我必须纠正你,这些想法不是我的。所有的功劳都在http://www.haskell.org/haskellwiki/Prime_numbers#Tree_merging。树折叠(使用平衡树)最初由Dave Bayer发布,结构由Heinrich Apfelmus更加优化(倾斜树),两阶段生产思路由Melissa O'Neill提出。Bayer和Apfelmus的代码使用了更复杂的优先合并,非常小心地避免过早强制;我发现粗心允许更简化的代码仍然有效。 :) - Will Ness
当然,在我看到Melissa O'Neill的文章之前,轮子技术就已经广为人知了。 :) - Will Ness
这需要计算一百万个质数约为10秒,而之前的答案说1,000,000th个质数只需要5秒? - Will Ness
@WillNess,我已经调整了这个算法,现在这个树折叠算法的速度与优先队列答案的纯函数版本大致相同,尽管使用可变实现的MinHeap优先队列仍然快两倍。你会发现,我研究了使用“非共享不动点组合子来进行望远镜多阶段质数生产”,但出于给定答案的原因决定不使用它。 - GordonBGood
“非共享”的目的正是为了防止将一个调用的输出链接到另一个调用的输入中(使这些调用独立起来;确保在运行时实际上有2个(3个、4个...或更多)调用)。这只是普通的递归;如果你的函数有一个名字,你可以直接用那个名字来调用它。Haskell可能会过于优化;当ps = g ... ps时,ps几乎肯定会被共享;而当ps() = g ... ps()时,ps()的输出可能会被共享。使用_Y则变得不太可能。但在F#/ML中,运行时行为更加精确定义。 - Will Ness
显示剩余7条评论

3

这个帖子有一些非常有趣和启发性的讨论。我知道这个帖子已经很老了,但我想回答原始问题的提问者。请回忆一下,他想要一个纯函数版本的厄拉多塞筛法。

let rec PseudoSieve list =
match list with
| hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
| [] -> []

这已经有了之前讨论过的缺陷。毫无疑问,最简单的纯函数式解决方案不使用变异、模数算术 - 需要对候选项进行太多的检查 - 可能会像这样:

let rec sieve primes = function
    | []        -> primes |> List.rev
    | p :: rest -> 
         sieve  (p :: primes) (rest |> List.except [p*p..p..n])

这显然不能达到最佳的性能和内存使用效果。有趣的是,检查一下List.except的实现方式是如何进行交叉操作的,以使它们只执行一次(这可能会使其成为埃拉托色尼筛法而不是其实现,但与OP中链接的论文中所提出的朴素解决方案相比具有相同的好处),并检查其大 O 成本。

尽管如此,我认为这仍然是对原始OP问题最简洁的回答。你怎么看?

更新:在List.except中使用了p*p,使之成为一个完整的筛法!

EDIT_ADD:

我是@GordonBGood,我会直接编辑你的回答(因为你要求想法),而不是进行一系列广泛的评论,如下:

  1. 首先,你的代码不会编译,因为n是未知的,并且必须在包含定义起始列表[2..n]的封闭代码中给出。

  2. 你的代码基本上是欧拉筛法,而不是请求的埃拉托色尼筛法(SoE)。虽然你是正确的,复合数的“交叉”只发生一次,因为在之后的候选列表中该复合数将不再存在,但是使用List.except只是“在幕后”定义了一个可以使用fold和filter函数完成的操作: 想想List.except正在做些什么 - 对于要筛选的每个元素,它会扫描以查看该元素是否与基本质因子列表中的任何元素匹配,如果是,则过滤掉。如果使用列表处理而不是可变数组来实现,这些扫描将被每个元素按比例复合。

  3. 以下是对你的代码进行完善以产生相同类型的质数序列的uint32参数的完整回答:

let sieveTo n =
  let rec sieve primes = function
    | []        -> primes |> List.rev
    | p :: rest -> sieve  (p :: primes) (rest |> List.except [p*p..p..n])
  sieve [] [ 2u .. n ] |> List.toSeq```

这个算法的对数复杂度非常高,筛选到10万需要大约2.3秒,筛选到100万则需要227秒,因为它是一种无用的功能筛法,由于范围(每个剩余元素的所有扫描)的快速增加,使得列表实现的筛法变得越来越困难。

  1. "Richard Bird" 筛法是一种真正基于列表的埃拉托斯特尼筛法,正如 OP 中所指出的那样,它通过考虑要筛选的候选列表是有序的,并且要排除/保留的质数组合成的列表(即合数列表)也是有序的,从而避免了重复处理。使用惰性求值,这些列表也可以是无限的,因此不需要一个 n,并产生一个“无限”的质数列表。 Richard Bird 筛法包括所有数字(不仅仅是奇数),包括 Co Inductive Stream(CIS)的惰性表达式,F#代码如下:
type 'a CIS = CIS of 'a * (unit -> 'a CIS) //'Co Inductive Stream for laziness
 
let primesBird() =
  let rec (^^) (CIS(x, xtlf) as xs) (CIS(y, ytlf) as ys) = // stream merge function
    if x < y then CIS(x, fun() -> xtlf() ^^ ys)
    elif y < x then CIS(y, fun() -> xs ^^ ytlf())
    else CIS(x, fun() -> xtlf() ^^ ytlf()) // eliminate duplicate!
  let pmltpls p = let rec nxt c = CIS(c, fun() -> nxt (c + p)) in nxt (p * p)
  let rec allmltps (CIS(p, ptlf)) = CIS(pmltpls p, fun() -> allmltps (ptlf()))
  let rec cmpsts (CIS(CIS(c, ctlf), amstlf)) =
    CIS(c, fun() -> (ctlf()) ^^ (cmpsts (amstlf())))
  let rec minusat n (CIS(c, ctlf) as cs) =
    if n < c then CIS(n, fun() -> minusat (n + 1u) cs)
    else minusat (n + 1u) (ctlf())
  let rec baseprms() = CIS(2u, fun() -> baseprms() |> allmltps |> cmpsts |> minusat 3u)
  Seq.unfold (fun (CIS(p, ptlf)) -> Some(p, ptlf())) (baseprms())

以上代码在我的机器上使用约2.3秒的时间来计算小于100万的质数。上述筛法已经改进,它使用一个名为baseprms的辅助流来引入合数筛选流。

  1. 这种方法的问题是除了它不具备使其成为奇数或更高程度轮因式分解的非常小的变化外,它仍然需要大约双倍于线性执行时间的范围(经验增长率约为1.3),或者大约与平方(log n)成比例。解决这个问题的方法是使用无限树状结构而不是线性排序对复合数进行排序,以减少到log n复杂度,如下面的代码所示(还具有使其成为奇数的微小变化):
type 'a CIS = CIS of 'a * (unit -> 'a CIS) //'Co Inductive Stream for laziness
 
let primesTreeFold() =
  let rec (^^) (CIS(x, xtlf) as xs) (CIS(y, ytlf) as ys) = // stream merge function
    if x < y then CIS(x, fun() -> xtlf() ^^ ys)
    elif y < x then CIS(y, fun() -> xs ^^ ytlf())
    else CIS(x, fun() -> xtlf() ^^ ytlf()) // no duplication
  let pmltpls p = let rec nxt c = CIS(c, fun() -> nxt (c + p)) in nxt (p * p)
  let rec allmltps (CIS(p, ptlf)) = CIS(pmltpls p, fun() -> allmltps (ptlf()))
  let rec pairs (CIS(cs0, cs0tlf)) = // implements infinite tree-folding
    let (CIS(cs1, cs1tlf)) = cs0tlf() in CIS(cs0 ^^ cs1, fun() -> pairs (cs1tlf()))
  let rec cmpsts (CIS(CIS(c, ctlf), amstlf)) = // pairs is used below...
    CIS(c, fun() -> ctlf() ^^ (cmpsts << pairs << amstlf)())
  let rec minusat n (CIS(c, ctlf) as cs) =
    if n < c then CIS(n, fun() -> minusat (n + 2u) cs)
    else minusat (n + 1u) (ctlf())
  let rec oddprms() = CIS(3u, fun() -> oddprms() |> allmltps |> cmpsts |> minusat 5u)
  Seq.unfold (fun (CIS(p, ptlf)) -> Some(p, ptlf())) (CIS(2u, fun() -> oddprms()))

注意一下非常微小的更改,以使用无限树折叠而不是线性排序; 它还需要递归的辅助馈送来产生初始化到2/3/5而不仅仅是2/3的附加级别,以防止失控。 这个小改变将计算素数的速度提高到一百万为0.437秒,一千万为4.91秒,一亿为62.4秒,增长率趋于与对数n成比例。

  1. 结论:您的实现在范围内几乎无法使用,可以手动使用SoE计算质数的范围,这些“功能”筛子中最好的一个在一分钟内可用于大约一亿左右的范围。但是,尽管相对简单,它们无法进行多线程处理,因为每个新状态都依赖于之前状态的连续性,并且计算开销使它们比基于数组变异的筛子慢数百倍,在其中我们可以在一瞬间找到十亿个质数。

抱歉在你的回答中插入我的想法,但你要求回应,我的想法太长了无法作为评论发布... - GordonBGood

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接