The sieve of Eratosthenes in F#

2019-01-07 06:44发布

问题:

I am interested in an implementation of the sieve of eratosthenes in purely functional F#. I am interested in an implementation of the actual sieve, not the naive functional implementation that isn't really the sieve, so not something like this:

let rec PseudoSieve list =
    match list with
    | hd::tl -> hd :: (PseudoSieve <| List.filter (fun x -> x % hd <> 0) tl)
    | [] -> []

The second link above briefly describes an algorithm that would require the use of a multimap, which isn't available in F# as far as I know. The Haskell implementation given uses a map that supports an insertWith method, which I haven't seen available in the F# functional map.

Does anyone know a way to translate the given Haskell map code to F#, or perhaps knows of alternative implementation methods or sieving algorithms that are as efficient and better suited for a functional implementation or F#?

回答1:

Reading that article I came up with an idea that doesn't require a multimap. It handles colliding map keys by moving the colliding key forward by its prime value again and again until it reaches a key that isn't in the map. Below primes is a map with keys of the next iterator value and values that are primes.

let primes = 
    let rec nextPrime n p primes =
        if primes |> Map.containsKey n then
            nextPrime (n + p) p primes
        else
            primes.Add(n, p)

    let rec prime n primes =
        seq {
            if primes |> Map.containsKey n then
                let p = primes.Item n
                yield! prime (n + 1) (nextPrime (n + p) p (primes.Remove n))
            else
                yield n
                yield! prime (n + 1) (primes.Add(n * n, n))
        }

    prime 2 Map.empty

Here's the priority queue based algorithm from that paper without the square optimization. I placed the generic priority queue functions at the top. I used a tuple to represent the lazy list iterators.

let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // skips primes 2, 3, 5, 7
    let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

Here's the priority queue based algorithm with the square optimization. In order to facilitate lazy adding primes to the lookup table, the wheel offsets had to be returned along with prime values. This version of the algorithm has O(sqrt(n)) memory usage where the none optimized one is O(n).

let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))

Here's my test program.

type GenericHeap<'T when 'T : comparison>(defaultValue : 'T) =
    let mutable capacity = 1
    let mutable values = Array.create capacity defaultValue
    let mutable size = 0

    let swap i n =
        let temp = values.[i]
        values.[i] <- values.[n]
        values.[n] <- temp

    let rec rollUp i =
        if i > 0 then
            let parent = (i - 1) / 2
            if values.[i] < values.[parent] then
                swap i parent
                rollUp parent

    let rec rollDown i =
        let left, right = 2 * i + 1, 2 * i + 2

        if right < size then
            if values.[left] < values.[i] then
                if values.[left] < values.[right] then
                    swap left i
                    rollDown left
                else
                    swap right i
                    rollDown right
            elif values.[right] < values.[i] then
                swap right i
                rollDown right
        elif left < size then
            if values.[left] < values.[i] then
                swap left i

    member this.insert (value : 'T) =
        if size = capacity then
            capacity <- capacity * 2
            let newValues = Array.zeroCreate capacity
            for i in 0 .. size - 1 do
                newValues.[i] <- values.[i]
            values <- newValues

        values.[size] <- value
        size <- size + 1
        rollUp (size - 1)

    member this.delete () =
        values.[0] <- values.[size]
        size <- size - 1
        rollDown 0

    member this.deleteInsert (value : 'T) =
        values.[0] <- value
        rollDown 0

    member this.min () =
        values.[0]

    static member Insert (value : 'T) (heap : GenericHeap<'T>) =
        heap.insert value
        heap    

    static member DeleteInsert (value : 'T) (heap : GenericHeap<'T>) =
        heap.deleteInsert value
        heap    

    static member Min (heap : GenericHeap<'T>) =
        heap.min()

type Heap = GenericHeap<int64 * int * int64>

let wheelData = [|2L;4L;2L;4L;6L;2L;6L;4L;2L;4L;6L;6L;2L;6L;4L;2L;6L;4L;6L;8L;4L;2L;4L;2L;4L;8L;6L;4L;6L;2L;4L;6L;2L;6L;6L;4L;2L;4L;6L;2L;6L;4L;2L;4L;2L;10L;2L;10L|]

let primes() = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime prime n table =
        insert (prime * prime, n, prime) table

    let rec adjust x (table : Heap) =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator table =
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (adjust x table)
            else
                if x = 13L then
                    yield! [2L; 3L; 5L; 7L; 11L]

                yield x
                yield! sieve (wheel iterator) (insertPrime x n table)
        }

    sieve (13L, 1, 1L) (insertPrime 11L 0 (Heap(0L, 0, 0L)))

let rec primes2() : seq<int64 * int> = 
    // the priority queue functions
    let insert = Heap.Insert
    let findMin = Heap.Min
    let insertDeleteMin = Heap.DeleteInsert

    // increments iterator
    let wheel (composite, n, prime) =
        composite + wheelData.[n % 48] * prime, n + 1, prime

    let insertPrime enumerator composite table =
        // lazy initialize the enumerator
        let enumerator =
            if enumerator = null then
                let enumerator = primes2().GetEnumerator()
                enumerator.MoveNext() |> ignore
                // skip primes that are a part of the wheel
                while fst enumerator.Current < 11L do
                    enumerator.MoveNext() |> ignore
                enumerator
            else
                enumerator

        let prime = fst enumerator.Current
        // Wait to insert primes until their square is less than the tables current min
        if prime * prime < composite then
            enumerator.MoveNext() |> ignore
            let prime, n = enumerator.Current
            enumerator, insert (prime * prime, n, prime) table
        else
            enumerator, table

    let rec adjust x table =
        let composite, n, prime = findMin table

        if composite <= x then 
            table 
            |> insertDeleteMin (wheel (composite, n, prime))
            |> adjust x
        else
            table

    let rec sieve iterator (enumerator, table) = 
        seq {
            let x, n, _ = iterator
            let composite, _, _ = findMin table

            if composite <= x then
                yield! sieve (wheel iterator) (enumerator, adjust x table)
            else
                if x = 13L then
                    yield! [2L, 0; 3L, 0; 5L, 0; 7L, 0; 11L, 0]

                yield x, n
                yield! sieve (wheel iterator) (insertPrime enumerator composite table)
        }

    sieve (13L, 1, 1L) (null, insert (11L * 11L, 0, 11L) (Heap(0L, 0, 0L)))


let mutable i = 0

let compare a b =
    i <- i + 1
    if a = b then
        true
    else
        printfn "%A %A %A" a b i
        false

Seq.forall2 compare (Seq.take 50000 (primes())) (Seq.take 50000 (primes2() |> Seq.map fst))
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

primes2()
|> Seq.map fst
|> Seq.skip 999999
|> Seq.take 10
|> Seq.toArray
|> printfn "%A"

System.Console.ReadLine() |> ignore


回答2:

Although there has been one answer giving an algorithm using a Priority Queue (PQ) as in a SkewBinomialHeap, it is perhaps not the right PQ for the job. What the incremental Sieve of Eratosthenes (iEoS) requires is a PQ that has excellent performance for getting the minimum value and reinserting values mostly slightly further down the queue but doesn't need the ultimate in performance for adding new values as iSoE only adds as new values a total of the primes up to the the square root of the range (which is a tiny fraction of the number of re-insertions that occur once per reduction). The SkewBinomialHeap PQ doesn't really give much more than using the built-in Map which uses a balanced binary search tree - all O(log n) operations - other than it changes the weighting of the operations slightly in favour of the SoE's requirements. However, the SkewBinaryHeap still requires many O(log n) operations per reduction.

A PQ implemented as a Heap in more particular as a Binary Heap and even more particularly as a MinHeap pretty much satisfies iSoE's requirements with O(1) performance in getting the minimum and O(log n) performance for re-insertions and adding new entries, although the performance is actually a fraction of O(log n) as most of the re-insertions occur near the top of the queue and most of the additions of new values (which don't matter as they are infrequent) occur near the end of the queue where these operations are most efficient. In addition, the MinHeap PQ can efficiently implement the delete minimum and insert function in one (generally a fraction of) one O(log n) pass. Then, rather than for the Map (which is implemented as an AVL tree) where there is one O(log n) operation with generally a full 'log n' range due to the minimum value we require being at the far left last leaf of the tree, we are generally adding and removing the minimum at the root and inserting on the average of a few levels down in one pass. Thus the MinHeap PQ can be used with only one fraction of O(log n) operation per culling reduction rather than multiple larger fraction O(log n) operations.

The MinHeap PQ can be implemented with pure functional code (with no "removeMin" implemented as the iSoE doesn't require it but there is an "adjust" function for use in segmentation), as follows:

[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  [<CompilationRepresentation(CompilationRepresentationFlags.UseNullAsTrueValue)>]
  [<NoEquality; NoComparison>]
  type MinHeapTree<'T> = 
      | HeapEmpty 
      | HeapOne of MinHeapTreeEntry<'T>
      | HeapNode of MinHeapTreeEntry<'T> * MinHeapTree<'T> * MinHeapTree<'T> * uint32

  let empty = HeapEmpty

  let getMin pq = match pq with | HeapOne(kv) | HeapNode(kv,_,_,_) -> Some kv | _ -> None

  let insert k v pq =
    let kv = MinHeapTreeEntry(k,v)
    let rec insert' kv msk pq =
      match pq with
        | HeapEmpty -> HeapOne kv
        | HeapOne kv2 -> if k < kv2.k then HeapNode(kv,pq,HeapEmpty,2u)
                          else let nn = HeapOne kv in HeapNode(kv2,nn,HeapEmpty,2u)
        | HeapNode(kv2,l,r,cnt) ->
          let nc = cnt + 1u
          let nmsk = if msk <> 0u then msk <<< 1
                     else let s = int32 (System.Math.Log (float nc) / System.Math.Log(2.0))
                          (nc <<< (32 - s)) ||| 1u //never ever zero again with the or'ed 1
          if k <= kv2.k then if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv,insert' kv2 nmsk l,r,nc)
                                                            else HeapNode(kv,l,insert' kv2 nmsk r,nc)
          else if (nmsk &&& 0x80000000u) = 0u then HeapNode(kv2,insert' kv nmsk l,r,nc)
                else HeapNode(kv2,l,insert' kv nmsk r,nc)
    insert' kv 0u pq

  let private reheapify kv k pq =
    let rec reheapify' pq =
      match pq with
        | HeapEmpty -> HeapEmpty //should never be taken
        | HeapOne kvn -> HeapOne kv
        | HeapNode(kvn,l,r,cnt) ->
            match r with
              | HeapOne kvr when k > kvr.k ->
                  match l with //never HeapEmpty
                    | HeapOne kvl when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,HeapOne kv,r,cnt)
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,HeapOne kv,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,HeapOne kv,cnt) //only right qualifies
              | HeapNode(kvr,_,_,_) when k > kvr.k -> //need adjusting for left leaf or else left leaf
                  match l with //never HeapEmpty or HeapOne
                    | HeapNode(kvl,_,_,_) when k > kvl.k -> //both qualify, choose least
                        if kvl.k > kvr.k then HeapNode(kvr,l,reheapify' r,cnt)
                        else HeapNode(kvl,reheapify' l,r,cnt)
                    | _ -> HeapNode(kvr,l,reheapify' r,cnt) //only right qualifies
              | _ -> match l with //r could be HeapEmpty but l never HeapEmpty
                        | HeapOne(kvl) when k > kvl.k -> HeapNode(kvl,HeapOne kv,r,cnt)
                        | HeapNode(kvl,_,_,_) when k > kvl.k -> HeapNode(kvl,reheapify' l,r,cnt)
                        | _ -> HeapNode(kv,l,r,cnt) //just replace the contents of pq node with sub leaves the same
    reheapify' pq


  let reinsertMinAs k v pq =
    let kv = MinHeapTreeEntry(k,v)
    reheapify kv k pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then rebuild by reheapify
    let rec adjust' pq =
      match pq with
        | HeapEmpty -> pq
        | HeapOne kv -> HeapOne(MinHeapTreeEntry(f kv.k kv.v))
        | HeapNode (kv,l,r,cnt) -> let nkv = MinHeapTreeEntry(f kv.k kv.v)
                                   reheapify nkv nkv.k (HeapNode(kv,adjust' l,adjust' r,cnt))
    adjust' pq

Using the above module, the iSoE can be written with the wheel factorization optimizations and using efficient Co-Inductive Streams (CIS's) as follows:

type CIS<'T> = class val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i + 1 else 0 in let advcnd c i = c + uint32 WHLPTRN.[i]
  let inline culladv c p i = let n = c + uint32 WHLPTRN.[i] * p in if n < c then 0xFFFFFFFFu else n
  let rec mkprm (n,wi,pq,(bps:CIS<_>),q) =
    let nxt = advcnd n wi in let nxti = whladv wi
    if nxt < n then (0u,0,(0xFFFFFFFFu,0,MinHeap.empty,bps,q))
    elif n>=q then let bp,bpi = bps.v in let nc,nci = culladv n bp bpi,whladv bpi
                    let nsd = bps.cont() in let np,_ = nsd.v in let sqr = if np>65535u then 0xFFFFFFFFu else np*np
                    mkprm (nxt,nxti,(MinHeap.insert nc (cullstate(bp,nci)) pq),nsd,sqr)
    else match MinHeap.getMin pq with | None -> (n,wi,(nxt,nxti,pq,bps,q))
                                      | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                                                   if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q)
                                                   else (n,wi,(nxt,nxti,pq,bps,q))
  let rec pCID p pi pq bps q = CIS((p,pi),fun()->let (np,npi,(nxt,nxti,npq,nbps,nq))=mkprm (advcnd p pi,whladv pi,pq,bps,q)
                                                 pCID np npi npq nbps nq)
  let rec baseprimes() = CIS((FSTPRM,0),fun()->let np=FSTPRM+uint32 WHLPTRN.[0]
                                               pCID np (whladv 0) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let genseq sd = Seq.unfold (fun (p,pi,pcc) ->if p=0u then None else Some(p,mkprm pcc)) sd
  seq { yield! WHLPRMS; yield! mkprm (FSTPRM,0,MinHeap.empty,baseprimes(),(FSTPRM*FSTPRM)) |> genseq }

The above code calculates the first 100,000 primes in about 0.077 seconds, the first 1,000,000 primes in 0.977 seconds, the first 10,000,000 primes in about 14.33 seconds, and the first 100,000,000 primes in about 221.87 seconds, all on an i7-2700K (3.5GHz) as 64-bit code. This purely functional code is slightly faster than that of Dustin Cambell's mutable Dictionary based code with the added common optimizations of wheel factorization, deferred adding of base primes, and use of the more efficient CID's all added (tryfsharp and ideone) but is still pure functional code where his using the Dictionary class is not. However, for larger prime ranges of about of about two billion (about 100 million primes), the code using the hash table based Dictionary will be faster as the Dictionary operations do not have a O(log n) factor and this gain overcomes the computational complexity of using Dictionary hash tables.

The above program has the further feature that the factorization wheel is parameterized so that, for instance, one can use a extremely large wheel by setting WHLPRMS to [| 2u;3u;5u;7u;11u;13u;17u;19u |] and FSTPRM to 23u to get a run time of about two thirds for large ranges at about 9.34 seconds for ten million primes, although note that it takes several seconds to compute the WHLPTRN before the program starts to run, which is a constant overhead no matter the prime range.

Comparative Analysis: As compared to the pure functional incremental tree folding implementation, this algorithm is just slightly faster because the average used height of the MinHeap tree is less by a factor of two than the depth of the folded tree but that is offset by an equivalent constant factor loss in efficiency in ability to traverse the PQ tree levels due to it being based on a binary heap requiring processing of both the right and left leaves for every heap level and a branch either way rather than a single comparison per level for the tree folding with generally the less deep branch the taken one. As compared to other PQ and Map based functional algorithms, improvements are generally by a constant factor in reducing the number of O(log n) operations in traversing each level of the respective tree structures.

The MinHeap is usually implemented as a mutable array binary heap after a genealogical tree based model invented by Michael Eytzinger over 400 years ago. I know the question said there was no interest in non-functional mutable code, but if one must avoid all sub code that uses mutability, then we couldn't use list's or LazyList's which use mutability "under the covers" for performance reasons. So imagine that the following alternate mutable version of the MinHeap PQ is as supplied by a library and enjoy another factor of over two for larger prime ranges in performance:

[<RequireQualifiedAccess>]
module MinHeap =

  type MinHeapTreeEntry<'T> = class val k:uint32 val v:'T new(k,v) = { k=k;v=v } end
  type MinHeapTree<'T> = ResizeArray<MinHeapTreeEntry<'T>>

  let empty<'T> = MinHeapTree<MinHeapTreeEntry<'T>>()

  let getMin (pq:MinHeapTree<_>) = if pq.Count > 0 then Some pq.[0] else None

  let insert k v (pq:MinHeapTree<_>) =
    if pq.Count = 0 then pq.Add(MinHeapTreeEntry(0xFFFFFFFFu,v)) //add an extra entry so there's always a right max node
    let mutable nxtlvl = pq.Count in let mutable lvl = nxtlvl <<< 1 //1 past index of value added times 2
    pq.Add(pq.[nxtlvl - 1]) //copy bottom entry then do bubble up while less than next level up
    while ((lvl <- lvl >>> 1); nxtlvl <- nxtlvl >>> 1; nxtlvl <> 0) do
      let t = pq.[nxtlvl - 1] in if t.k > k then pq.[lvl - 1] <- t else lvl <- lvl <<< 1; nxtlvl <- 0 //causes loop break
    pq.[lvl - 1] <-  MinHeapTreeEntry(k,v); pq

  let reinsertMinAs k v (pq:MinHeapTree<_>) = //do minify down for value to insert
    let mutable nxtlvl = 1 in let mutable lvl = nxtlvl in let cnt = pq.Count
    while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
      let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
      let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
      if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
    pq.[lvl - 1] <- MinHeapTreeEntry(k,v); pq

  let adjust f (pq:MinHeapTree<_>) = //adjust all the contents using the function, then re-heapify
    if pq <> null then 
      let cnt = pq.Count
      if cnt > 1 then
        for i = 0 to cnt - 2 do //change contents using function
          let e = pq.[i] in let k,v = e.k,e.v in pq.[i] <- MinHeapTreeEntry (f k v)
        for i = cnt/2 downto 1 do //rebuild by reheapify
          let kv = pq.[i - 1] in let k = kv.k
          let mutable nxtlvl = i in let mutable lvl = nxtlvl
          while (nxtlvl <- nxtlvl <<< 1; nxtlvl < cnt) do
            let lk = pq.[nxtlvl - 1].k in let rk = pq.[nxtlvl].k in let oldlvl = lvl
            let k = if k > lk then lvl <- nxtlvl; lk else k in if k > rk then nxtlvl <- nxtlvl + 1; lvl <- nxtlvl
            if lvl <> oldlvl then pq.[oldlvl - 1] <- pq.[lvl - 1] else nxtlvl <- cnt //causes loop break
          pq.[lvl - 1] <- kv
    pq

Geek note: I had actually expected the mutable version to offer a much better improved performance ratio, but it bogs down in the re-insertions due to the nested if-then-else code structure and the random behavior of the prime cull values meaning that the CPU branch prediction fails for a large proportion of the branches resulting in many additional 10's of CPU clock cycles per cull reduction to rebuilt the instruction pre-fetch cache.

The only other constant factor performance gains on this algorithm would be segmentation and use of multi-tasking for a performance gain proportional to the number of CPU cores; however, as it stands, this is the fastest pure functional SoE algorithm to date, and even the pure functional form using the functional MinHeap beats simplistic imperative implementations such as Jon Harrop's code or Johan Kullbom's Sieve of Atkin (which is in error in his timing as he only calculated the primes to 10 million rather than the 10 millionth prime), but those algorithms would be about five times faster if better optimizations were used. That ratio of about five between functional and imperative code will be somewhat reduced when we add multi-threading of larger wheel factorization as the computational complexity of the imperative code increases faster than the functional code and multi-threading helps the slower functional code more than the faster imperative code as the latter gets closer to the base limit of the time required to enumerate through the found primes.

EDIT_ADD: Even though one could elect to continue to use the pure functional version of MinHeap, adding efficient segmentation in preparation for multi-threading would slightly "break" the "pureness" of the functional code as follows: 1) The most efficient way of transferring a representation of composite-culled primes is a packed-bit array the size of the segment, 2) While the size of the array is known, using an array comprehension to initialize it in a functional way isn't efficient as it uses "ResizeArray" under the covers which needs to copy itself for every x additions (I think 'x' is eight for the current implementation) and using Array.init doesn't work as many values at particular indexes are skipped, 3) Therefore, the easiest way to fill the culled-composite array is to zeroCreate it of the correct size and then run an initialization function which could write to each mutable array index no more than once. Although this isn't strictly "functional", it is close in that the array is initialized and then never modified again.

The code with added segmentation, multi-threading, programmable wheel factorial circumference, and many performance tweaks is as follows (other than some added new constants, the extra tuned code to implement the segmentation and multi-threading is the bottom approximately half of the code starting at the "prmspg" function):

type prmsCIS = class val pg:uint16 val bg:uint16 val pi:int val cont:unit->prmsCIS
                     new(pg,bg,pi,nxtprmf) = { pg=pg;bg=bg;pi=pi;cont=nxtprmf } end
type cullstate = struct val p:uint32 val wi:int new(cnd,cndwi) = { p=cnd;wi=cndwi } end
let primesPQOWSE() =
  let WHLPRMS = [| 2u;3u;5u;7u;11u;13u;17u |] in let FSTPRM = 19u in let WHLCRC = int(WHLPRMS |> Seq.fold (*) 1u)
  let MXSTP = uint64(FSTPRM-1u) in let BFSZ = 1<<<11 in let NUMPRCS = System.Environment.ProcessorCount
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1 in let WHLPTRN = Array.zeroCreate (WHLLMT+1)
  let WHLRNDUP = let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1)
                                                          else acc in let b = a |> Array.scan (+) 0
                                      Array.init (WHLCRC>>>1) (fun i->
                                        if a.[i]=0 then 0 else let g=2*gap (i+1) 1 in WHLPTRN.[b.[i]]<-byte g;1)
                 Array.init WHLCRC (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u) then 1 else 0)
                 |> gaps |> Array.scan (+) 0
  let WHLPOS = WHLPTRN |> Array.map (uint32) |> Array.scan (+) 0u in let advcnd cnd cndi = cnd + uint32 WHLPTRN.[cndi]
  let MINRNGSTP = if WHLLMT<=31 then uint32(32/(WHLLMT+1)*WHLCRC) else if WHLLMT=47 then uint32 WHLCRC<<<1 else uint32 WHLCRC
  let MINBFRNG = uint32((BFSZ<<<3)/(WHLLMT+1)*WHLCRC)/MINRNGSTP*MINRNGSTP
  let MINBFRNG = if MINBFRNG=0u then MINRNGSTP else MINBFRNG
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline culladv c p i = c+uint32 WHLPTRN.[i]*p
  let rec mkprm (n,wi,pq,(bps:prmsCIS),q,lstp,bgap) =
    let nxt,nxti = advcnd n wi,whladv wi
    if n>=q then let p = (uint32 bps.bg<<<16)+uint32 bps.pg
                 let nbps,nxtcmpst,npi = bps.cont(),culladv n p bps.pi,whladv bps.pi
                 let pg = uint32 nbps.pg in let np = p+pg in let sqr = q+pg*((p<<<1)+pg) //only works to p < about 13 million
                 let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont) //therefore, algorithm only works to p^2 or about
                 mkprm (nxt,nxti,(MinHeap.insert nxtcmpst (cullstate(p,npi)) pq),nbps,sqr,lstp,(bgap+1us)) //1.7 * 10^14
    else match MinHeap.getMin pq with 
           | None -> (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us)) //fix with q is uint64
           | Some kv -> let ca,cs = culladv kv.k kv.v.p kv.v.wi,cullstate(kv.v.p,whladv kv.v.wi)
                        if n>kv.k then mkprm (n,wi,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,bgap)
                        elif n=kv.k then mkprm (nxt,nxti,(MinHeap.reinsertMinAs ca cs pq),bps,q,lstp,(bgap+1us))
                        else (uint16(n-uint32 lstp),bgap,wi,(nxt,nxti,pq,bps,q,n,1us))
  let rec pCIS p pg bg pi pq bps q = prmsCIS(pg,bg,pi,fun()->
    let (npg,nbg,npi,(nxt,nxti,npq,nbps,nq,nl,ng))=mkprm (p+uint32 WHLPTRN.[pi],whladv pi,pq,bps,q,p,0us)
    pCIS (p+uint32 npg) npg nbg npi npq nbps nq)
  let rec baseprimes() = prmsCIS(uint16 FSTPRM,0us,0,fun()->
                           let np,npi=advcnd FSTPRM 0,whladv 0
                           pCIS np (uint16 WHLPTRN.[0]) 1us npi MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM))
  let prmspg nxt adj pq bp q =
    //compute next buffer size rounded up to next even wheel circle so at least one each base prime hits the page
    let rng = max (((uint32(MXSTP+uint64(sqrt (float (MXSTP*(MXSTP+4UL*nxt))))+1UL)>>>1)+MINRNGSTP)/MINRNGSTP*MINRNGSTP) MINBFRNG
    let nxtp() = async {
      let rec addprms pqx (bpx:prmsCIS) qx = 
        if qx>=adj then pqx,bpx,qx //add primes to queue for new lower limit
        else let p = (uint32 bpx.bg<<<16)+uint32 bpx.pg in let nbps = bpx.cont()
             let pg = uint32 nbps.pg in let np = p+pg in let sqr = qx+pg*((p<<<1)+pg)
             let nbps = prmsCIS(uint16 np,uint16(np>>>16),nbps.pi,nbps.cont)
             addprms (MinHeap.insert qx (cullstate(p,bpx.pi)) pqx) nbps sqr
      let adjcinpg low k (v:cullstate) = //adjust the cull states for the new page low value
        let p = v.p in let WHLSPN = int64 WHLCRC*int64 p in let db = int64 p*int64 WHLPOS.[v.wi]
        let db = if k<low then let nk = int64(low-k)+db in nk-((nk/WHLSPN)*WHLSPN)
                 else let nk = int64(k-low) in if db<nk then db+WHLSPN-nk else db-nk
        let r = WHLRNDUP.[int((((db>>>1)%(WHLSPN>>>1))+int64 p-1L)/int64 p)] in let x = int64 WHLPOS.[r]*int64 p
        let r = if r>WHLLMT then 0 else r in let x = if x<db then x+WHLSPN-db else x-db in uint32 x,cullstate(p,r)
      let bfbtsz = int rng/WHLCRC*(WHLLMT+1) in let nbuf = Array.zeroCreate (bfbtsz>>>5)
      let rec nxtp' wi cnt = let _,nbg,_,ncnt = mkprm cnt in let nwi = wi + int nbg
                             if nwi < bfbtsz then nbuf.[nwi>>>5] <- nbuf.[nwi>>>5] ||| (1u<<<(nwi&&&0x1F)); nxtp' nwi ncnt
                             else let _,_,pq,bp,q,_,_ = ncnt in nbuf,pq,bp,q //results incl buf and cont parms for next page
      let npq,nbp,nq = addprms pq bp q
      return nxtp' 0 (0u,0,MinHeap.adjust (adjcinpg adj) npq,nbp,nq-adj,0u,0us) }
    rng,nxtp() |> Async.StartAsTask
  let nxtpg nxt (cont:(_*System.Threading.Tasks.Task<_>)[]) = //(len,pq,bp,q) =
    let adj = (cont |> Seq.fold (fun s (r,_)  -> s+r) 0u)
    let _,tsk = cont.[0] in let _,pq,bp,q = tsk.Result
    let ncont = Array.init (NUMPRCS+1) (fun i -> if i<NUMPRCS then cont.[i+1]
                                                 else prmspg (nxt+uint64 adj) adj pq bp q)
    let _,tsk = ncont.[0] in let nbuf,_,_,_ = tsk.Result in nbuf,ncont
  //init cond buf[0], no queue, frst bp sqr offset
  let initcond = 0u,System.Threading.Tasks.Task.Factory.StartNew (fun()->
                   (Array.empty,MinHeap.empty,baseprimes(),FSTPRM*FSTPRM-FSTPRM))
  let nxtcond n = prmspg (uint64 n) (n-FSTPRM) MinHeap.empty (baseprimes()) (FSTPRM*FSTPRM-FSTPRM)
  let initcont = Seq.unfold (fun (n,((r,_)as v))->Some(v,(n+r,nxtcond (n+r)))) (FSTPRM,initcond)
                 |> Seq.take (NUMPRCS+1) |> Seq.toArray
  let rec nxtprm (c,ci,i,buf:uint32[],cont) =
    let rec nxtprm' c ci i =
      let nc = c + uint64 WHLPTRN.[ci] in let nci = whladv ci in let ni = i + 1 in let nw = ni>>>5
      if nw >= buf.Length then let (npg,ncont)=nxtpg nc cont in nxtprm (c,ci,-1,npg,ncont)
      elif (buf.[nw] &&& (1u <<< (ni &&& 0x1F))) = 0u then nxtprm' nc nci ni
      else nc,nci,ni,buf,cont
    nxtprm' c ci i
  seq { yield! WHLPRMS |> Seq.map (uint64);
        yield! Seq.unfold (fun ((c,_,_,_,_) as cont)->Some(c,nxtprm cont))
                 (nxtprm (uint64 FSTPRM-uint64 WHLPTRN.[WHLLMT],WHLLMT,-1,Array.empty,initcont)) }

Note that the MinHeap modules, both functional and array-based, have had an "adjust" function added to permit adjusting of the cull state of each thread's version of the PQ at the beginning of every new segment page. Also note that it was possible to adjust the code so that most of the computation is done using 32 bit ranges with the final sequence output as uint64's at little cost in computational time so that currently the theoretical range is something over 100 trillion (ten raised to the fourteen power) if one were willing to wait the about three to four months required to compute that range. The numeric range checks were removed as it is unlikely that anyone would use this algorithm to compute up to that range let alone past it.

Using the pure functional MinHeap and 2,3,5,7 wheel factorization, the above program computes the first hundred thousand, one million, ten million, and a hundred million primes in 0.062, 0.629, 10.53, and 195.62 seconds, respectively. Using the array-based MinHeap speeds this up to 0.097, 0.276, 3.48, and 51.60 seconds, respectively. Using the 2,3,5,7,11,13,17 wheel by changing WHLPRMS to "[| 2u;3u;5u;7u;11u;13u;17u |]" and FSTPRM to 19u speeds that up yet a little more to 0.181, 0.308, 2.49, and 36.58 seconds, respectively (for constant factor improvement with a constant overhead). This fastest tweak calculates the 203,280,221 primes in the 32-bit number range in about 88.37 seconds. The "BFSZ" constant can be adjusted with trade-offs between slower times for smaller ranges version faster times for larger ranges, with a value of "1<<<14" recommended to be tried for the larger ranges. This constant only sets the minimum buffer size, with the program adjusting the buffer size above that size automatically for larger ranges such that the buffer is sufficient so that the largest base prime required for the page range will always "strike" each page at least once; this means that the complexity and overhead of an additional "bucket sieve" is not required. This last fully optimized version can compute the primes up to 10 and 100 billion in about 256.8 and 3617.4 seconds (just over an hour for the 100 billion) as tested using "primesPQOWSE() |> Seq.takeWhile ((>=)100000000000UL) |> Seq.fold (fun s p -> s + 1UL) 0UL" for output. This is where the estimates of about half a day for the count of primes to a trillion, a week for up to ten trillion and about three to four months for up to a hundred trillion come from.

I don't think it's possible to make functional or almost functional code using the incremental SoE algorithm to run much faster than this. As one can see in looking at the code, optimizing the basic incremental algorithm has added greatly to the code complexity such that it is likely slightly more complex than equivalently optimized code based on straight array culling with that code able to run approximately ten times faster than this and without the extra exponent in the performance meaning that this functional incremental code has an ever increasing extra percentage overhead.

So is this useful other than from an interesting theoretical and intellectual viewpoint? Probably it's not. For smaller ranges of primes up to about ten million, the best of the basic not fully optimized incremental functional SoE's are probably adequate and quite simple to write or have less RAM memory use than the simplest imperative SoE's. However, they are much slower than more imperative code using an array so they "run out of steam" for ranges above that. While it has been demonstrated here that the code can be sped up by optimization, it is still 10's of times slower than a more imperative pure array-based version yet has added to the complexity to be at least as complex as that code with equivalent optimizations, and even that code under F# on DotNet is about four times slower than using a language such as C++ compiled directly to native code; if one really wanted to investigate large ranges of primes, one would likely use one of those other languages and techniques where primesieve can calculate the number of primes in the hundred trillion range in under four hours instead of the about three months required for this code. END_EDIT_ADD



回答3:

Here's my attempt at a reasonably faithful translation of the Haskell code to F#:

#r "FSharp.PowerPack"

module Map =
  let insertWith f k v m =
    let v = if Map.containsKey k m then f m.[k] v else v
    Map.add k v m

let sieve =
  let rec sieve' map = function
  | LazyList.Nil -> Seq.empty
  | LazyList.Cons(x,xs) -> 
      if Map.containsKey x map then
        let facts = map.[x]
        let map = Map.remove x map
        let reinsert m p = Map.insertWith (@) (x+p) [p] m
        sieve' (List.fold reinsert map facts) xs
      else
        seq {
          yield x
          yield! sieve' (Map.add (x*x) [x] map) xs
        }
  fun s -> sieve' Map.empty (LazyList.ofSeq s)

let rec upFrom i =
  seq {
    yield i
    yield! upFrom (i+1)
  }

let primes = sieve (upFrom 2)


回答4:

Prime sieve implemented with mailbox processors:

let (<--) (mb : MailboxProcessor<'a>) (message : 'a) = mb.Post(message)
let (<-->) (mb : MailboxProcessor<'a>) (f : AsyncReplyChannel<'b> -> 'a) = mb.PostAndAsyncReply f

type 'a seqMsg =  
    | Next of AsyncReplyChannel<'a>   

type PrimeSieve() =   
    let counter(init) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop n =   
                async { let! msg = inbox.Receive()   
                        match msg with
                        | Next(reply) ->   
                            reply.Reply(n)   
                            return! loop(n + 1) }   
            loop init)   

    let filter(c : MailboxProcessor<'a seqMsg>, pred) =   
        MailboxProcessor.Start(fun inbox ->   
            let rec loop() =   
                async {   
                    let! msg = inbox.Receive()   
                    match msg with
                    | Next(reply) ->
                        let rec filter prime =
                            if pred prime then async { return prime }
                            else async {
                                let! next = c <--> Next
                                return! filter next }
                        let! next = c <--> Next
                        let! prime = filter next
                        reply.Reply(prime)
                        return! loop()   
                }   
            loop()   
        )   

    let processor = MailboxProcessor.Start(fun inbox ->   
        let rec loop (oldFilter : MailboxProcessor<int seqMsg>) prime =   
            async {   
                let! msg = inbox.Receive()   
                match msg with
                | Next(reply) ->   
                    reply.Reply(prime)   
                    let newFilter = filter(oldFilter, (fun x -> x % prime <> 0))   
                    let! newPrime = oldFilter <--> Next
                    return! loop newFilter newPrime   
            }   
        loop (counter(3)) 2)   

    member this.Next() = processor.PostAndReply( (fun reply -> Next(reply)), timeout = 2000)

    static member upto max =
        let p = PrimeSieve()
        Seq.initInfinite (fun _ -> p.Next())
        |> Seq.takeWhile (fun prime -> prime <= max)
        |> Seq.toList


回答5:

Here is a pretty much maximally optimized as to algorithm incremental (and recursive) map based Sieve of Eratosthenes using sequences since there is no need for memoization of previous sequence values (other than there is a slight advantage to caching the base prime values using Seq.cache), with the major optimizations being that it uses wheel factorization for the input sequence and that it uses multiple (recursive) streams to maintain the base primes which are less than the square root of the latest number being sieved, as follows:

  let primesMPWSE =
    let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                     4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
    let adv i = if i < 47 then i + 1 else 0
    let reinsert oldcmpst mp (prime,pi) =
      let cmpst = oldcmpst + whlptrn.[pi] * prime
      match Map.tryFind cmpst mp with
        | None -> mp |> Map.add cmpst [(prime,adv pi)]
        | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
    let rec mkprimes (n,i) m ps q =
      let nxt = n + whlptrn.[i]
      match Map.tryFind n m with
        | None -> if n < q then seq { yield (n,i); yield! mkprimes (nxt,adv i) m ps q }
                  else let (np,npi),nlst = Seq.head ps,ps |> Seq.skip 1
                       let (nhd,ni),nxtcmpst = Seq.head nlst,n + whlptrn.[npi] * np
                       mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nlst (nhd * nhd)
        | Some(skips) -> let adjmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                         mkprimes (nxt,adv i) adjmap ps q
    let rec prs = seq {yield (11,0); yield! mkprimes (13,1) Map.empty prs 121 } |> Seq.cache
    seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> Seq.map (fun (p,i) -> p) }

It finds the 100,000th primes up to 1,299,721 in about a 0.445 second, but not being a proper imperative EoS algorithm it doesn't scale near linearly with increased numbers of primes, takes 7.775 seconds to find the 1,000,000 prime up to 15,485,867 for a performance over this range of about O(n^1.2) where n is the maximum prime found.

There is a bit more tuning that could be done, but it probably isn't going to make much of a difference as to a large percentage in better performance as follows:

  1. As the F# sequence library is markedly slow, one could use an self defined type that implements IEnumerable to reduce the time spent in the inner sequence, but as the sequence operations only take about 20% of to overall time, even if these were reduced to zero time the result would only be a reduction to 80% of the time.

  2. Other forms of map storage could be tried such as a priority queue as mentioned by O'Neil or the SkewBinomialHeap as used by @gradbot, but at least for the SkewBinomialHeap, the improvement in performance is only a few percent. It seems that in choosing different map implementations, one is just trading better response in finding and removing items that are near the beginning of the list against time spent in inserting new entries in order to enjoy those benefits so the net gain is pretty much a wash and still has a O(log n) performance with increasing entries in the map. The above optimization using multi streams of entries just to the square root reduce the number of entries in the map and thus make those improvements of not much importance.

EDIT_ADD: I did do the little extra bit of optimization and the performance did improve somewhat more than expected, likely due to the improved way of eliminating the Seq.skip as a way of advancing through the base primes sequence. This optimization uses a replacement for the inner sequence generation as a tuple of integer value and a continuation function used to advance to the next value in the sequence, with the final F# sequence generated by an overall unfold operation. Code is as follows:

type SeqDesc<'a> = SeqDesc of 'a * (unit -> SeqDesc<'a>) //a self referring tuple type
let primesMPWSE =
  let whlptrn = [| 2;4;2;4;6;2;6;4;2;4;6;6;2;6;4;2;6;4;6;8;4;2;4;2;
                   4;8;6;4;6;2;4;6;2;6;6;4;2;4;6;2;6;4;2;4;2;10;2;10 |]
  let inline adv i = if i < 47 then i + 1 else 0
  let reinsert oldcmpst mp (prime,pi) =
    let cmpst = oldcmpst + whlptrn.[pi] * prime
    match Map.tryFind cmpst mp with
      | None -> mp |> Map.add cmpst [(prime,adv pi)]
      | Some(facts) -> mp |> Map.add cmpst ((prime,adv pi)::facts)
  let rec mkprimes (n,i) m (SeqDesc((np,npi),nsdf) as psd) q =
    let nxt = n + whlptrn.[i]
    match Map.tryFind n m with
      | None -> if n < q then SeqDesc((n,i),fun() -> mkprimes (nxt,adv i) m psd q)
                else let (SeqDesc((nhd,x),ntl) as nsd),nxtcmpst = nsdf(),n + whlptrn.[npi] * np
                     mkprimes (nxt,adv i) (Map.add nxtcmpst [(np,adv npi)] m) nsd (nhd * nhd)
      | Some(skips) -> let adjdmap = skips |> List.fold (reinsert n) (m |> Map.remove n)
                       mkprimes (nxt,adv i) adjdmap psd q
  let rec prs = SeqDesc((11,0),fun() -> mkprimes (13,1) Map.empty prs 121 )
  let genseq sd = Seq.unfold (fun (SeqDesc((n,i),tailfunc)) -> Some(n,tailfunc())) sd
  seq { yield 2; yield 3; yield 5; yield 7; yield! mkprimes (11,0) Map.empty prs 121 |> genseq }

The times required to find the 100,000th and 1,000,000th primes are about 0.31 and 5.1 seconds, respectively, so there is a considerable percentage gain for this small change. I did try my own implementation of the IEnumerable/IEnumerator interfaces that are the base of sequences, and although they are faster than the versions used by the Seq module they hardly make any further difference to this algorithm where most of the time is spent in the Map functions. END_EDIT_ADD

Other than map based incremental EoS implementations, there is another "pure functional" implementation using Tree Folding which is said to be slightly faster, but as it still has a O(log n) term in the tree folding I suspect that it will mainly be faster (if it is) due to how the algorithm is implemented as to numbers of computer operations as compared to using a map. If people are interested I will develop that version as well.

In the end, one must accept that no pure functional implementation of the incremental EoS will ever come close to the raw processing speed of a good imperative implementation for larger numerical ranges. However, one could come up with an approach where all the code is purely functional except for the segmented sieving of composite numbers over a range using a (mutable) array which would come close to O(n) performance and in practical use would be fifty to a hundred times faster than functional algorithms for large ranges such as the first 200,000,000 primes. This has been done by @Jon Harrop in his blog, but this could be tuned further with very little additional code.



回答6:

Here is my two cents, though I am not sure it meets the OP's criterion for truely being the sieve of eratosthenes. It doesn't utilize modular division and implements an optimization from the paper cited by the OP. It only works for finite lists, but that seems to me to be in the spirit of how the sieve was originally described. As an aside, the paper the talks about complexiety in terms of the number of markings and the number of divisions. Seems that, as we have to traverse a linked list, that this perhaps ignoring some key aspects of the various algorithms in performance terms. In general though modular division with computers is an expensive operation.

open System

let rec sieve list =
    let rec helper list2 prime next =
        match list2 with
            | number::tail -> 
                if number< next then
                    number::helper tail prime next
                else
                    if number = next then 
                        helper tail prime (next+prime)
                    else
                        helper (number::tail) prime (next+prime)

            | []->[]
    match list with
        | head::tail->
            head::sieve (helper tail head (head*head))
        | []->[]

let step1=sieve [2..100]

EDIT: fixed an error in the code from my original post. I tried the follow the original logic of the sieve with a few modifications. Namely start with the first item and cross off the multiples of that item from the set. This algorithm literally looks for the next item that is a multiple of the prime instead of doing modular division on every number in the set. An optimization from the paper is that it starts looking for multiples of the prime greater than p^2.

The part in the helper function with the multi-level deals with the possibility that the next multiple of the prime might already be removed from the list. So for instance with the prime 5, it will try to remove the number 30, but it will never find it because it was already removed by the prime 3. Hope that clarifies the algorithm's logic.



回答7:

I know you explicitly stated that you were interested in a purely functional sieve implementation so I held off presenting my sieve until now. But upon re-reading the paper you referenced, I see the incremental sieve algorithm presented there is essentially the same as my own, the only difference being implementation details of using purely functional techniques versus decidedly imperative techniques. So I think I at least half-qualify in satisfying your curiosity. Moreover, I would argue that using imperative techniques when significant performance gains can be realized but hidden away by functional interfaces is one of the most powerful techniques encouraged in F# programming, as opposed to the everything pure Haskell culture. I first published this implementation on my Project Euler for F#un blog but re-publish here with pre-requisite code substituted back in and structural typing removed. primes can calculate the first 100,000 primes in 0.248 seconds and the first 1,000,000 primes in 4.8 seconds on my computer (note that primes caches its results so you'll need to re-evaluate it each time you perform a benchmark).

let inline infiniteRange start skip = 
    seq {
        let n = ref start
        while true do
            yield n.contents
            n.contents <- n.contents + skip
    }

///p is "prime", s=p*p, c is "multiplier", m=c*p
type SievePrime<'a> = {mutable c:'a ; p:'a ; mutable m:'a ; s:'a}

///A cached, infinite sequence of primes
let primes =
    let primeList = ResizeArray<_>()
    primeList.Add({c=3 ; p=3 ; m=9 ; s=9})

    //test whether n is composite, if not add it to the primeList and return false
    let isComposite n = 
        let rec loop i = 
            let sp = primeList.[i]
            while sp.m < n do
                sp.c <- sp.c+1
                sp.m <- sp.c*sp.p

            if sp.m = n then true
            elif i = (primeList.Count-1) || sp.s > n then
                primeList.Add({c=n ; p=n ; m=n*n ; s=n*n})
                false
            else loop (i+1)
        loop 0

    seq { 
        yield 2 ; yield 3

        //yield the cached results
        for i in 1..primeList.Count-1 do
            yield primeList.[i].p

        yield! infiniteRange (primeList.[primeList.Count-1].p + 2) 2 
               |> Seq.filter (isComposite>>not)
    }


回答8:

Here is yet another method of accomplishing the incremental Sieve of Eratosthenes (SoE) using only pure functional F# code. It is adapted from the Haskell code developed as "This idea is due to Dave Bayer, though he used a complex formulation and balanced ternary tree structure, progressively deepening in uniform manner (simplified formulation and a skewed, deepening to the right binary tree structure introduced by Heinrich Apfelmus, further simplified by Will Ness). Staged production idea due to M. O'Neill" as per the following link: Optimized Tree Folding code using a factorial wheel in Haskell.

The following code has several optimizations that make it more suitable for execution in F#, as follows:

  1. The code uses coinductive streams instead of LazyList's as this algorithm has no (or little) need of LazyList's memoization and my coinductive streams are more efficient than either LazyLists (from the FSharp.PowerPack) or the built in sequences. A further advantage is that my code can be run on tryFSharp.org and ideone.com without having to copy and paste in the Microsoft.FSharp.PowerPack Core source code for the LazyList type and module (along with copyright notice)

  2. It was discovered that there is quite a lot of overhead for F#'s pattern matching on function parameters so the previous more readable discriminated union type using tuples was sacrificed in favour of by-value struct (or class as runs faster on some platforms) types for about a factor of two or more speed up.

  3. Will Ness's optimizations going from linear tree folding to bilateral folding to multi-way folding and improvements using the wheel factorization are about as effective ratiometrically for F# as they are for Haskell, with the main difference between the two languages being that Haskell can be compiled to native code and has a more highly optimized compiler whereas F# has more overhead running under the DotNet Framework system.

    type prmstate = struct val p:uint32 val pi:byte new (prm,pndx) = { p = prm; pi = pndx } end
    type prmsSeqDesc = struct val v:prmstate val cont:unit->prmsSeqDesc new(ps,np) = { v = ps; cont = np } end
    type cmpststate = struct val cv:uint32 val ci:byte val cp:uint32 new (strt,ndx,prm) = {cv = strt;ci = ndx;cp = prm} end
    type cmpstsSeqDesc = struct val v:cmpststate val cont:unit->cmpstsSeqDesc new (cs,nc) = { v = cs; cont = nc } end
    type allcmpsts = struct val v:cmpstsSeqDesc val cont:unit->allcmpsts new (csd,ncsdf) = { v=csd;cont=ncsdf } end
    
    let primesTFWSE =
      let whlptrn = [| 2uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;6uy;6uy;2uy;6uy;4uy;2uy;6uy;4uy;6uy;8uy;4uy;2uy;4uy;2uy;
                       4uy;8uy;6uy;4uy;6uy;2uy;4uy;6uy;2uy;6uy;6uy;4uy;2uy;4uy;6uy;2uy;6uy;4uy;2uy;4uy;2uy;10uy;2uy;10uy |]
      let inline whladv i = if i < 47uy then i + 1uy else 0uy
      let inline advmltpl c ci p = cmpststate (c + uint32 whlptrn.[int ci] * p,whladv ci,p)
      let rec pmltpls cs = cmpstsSeqDesc(cs,fun() -> pmltpls (advmltpl cs.cv cs.ci cs.cp))
      let rec allmltpls (psd:prmsSeqDesc) =
        allcmpsts(pmltpls (cmpststate(psd.v.p*psd.v.p,psd.v.pi,psd.v.p)),fun() -> allmltpls (psd.cont()))
      let rec (^) (xs:cmpstsSeqDesc) (ys:cmpstsSeqDesc) = //union op for SeqDesc's
        match compare xs.v.cv ys.v.cv with
          | -1 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys)
          | 0 -> cmpstsSeqDesc (xs.v,fun() -> xs.cont() ^ ys.cont())
          | _ -> cmpstsSeqDesc(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
      let rec pairs (csdsd:allcmpsts) =
        let ys = csdsd.cont in
        allcmpsts(cmpstsSeqDesc(csdsd.v.v,fun()->csdsd.v.cont()^ys().v),fun()->pairs (ys().cont()))
      let rec joinT3 (csdsd:allcmpsts) = cmpstsSeqDesc(csdsd.v.v,fun()->
        let ys = csdsd.cont() in let zs = ys.cont() in (csdsd.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
      let rec mkprimes (ps:prmstate) (csd:cmpstsSeqDesc) =
        let nxt = ps.p + uint32 whlptrn.[int ps.pi]
        if ps.p >= csd.v.cv then mkprimes (prmstate(nxt,whladv ps.pi)) (csd.cont()) //minus function
        else prmsSeqDesc(prmstate(ps.p,ps.pi),fun() -> mkprimes (prmstate(nxt,whladv ps.pi)) csd)
      let rec baseprimes = prmsSeqDesc(prmstate(11u,0uy),fun() -> mkprimes (prmstate(13u,1uy)) initcmpsts)
      and initcmpsts = joinT3 (allmltpls baseprimes)
      let genseq sd = Seq.unfold (fun (psd:prmsSeqDesc) -> Some(psd.v.p,psd.cont())) sd
      seq { yield 2u; yield 3u; yield 5u; yield 7u; yield! mkprimes (prmstate(11u,0uy)) initcmpsts |> genseq }
    
    primesLMWSE |> Seq.nth 100000
    

EDIT_ADD: This has been corrected as the original code did not properly handle the tail of the stream and passed the tail of the parameter stream to the pairs function to the joinT3 function rather than the tail following the zs stream. The timing below has also accordingly been corrected, with about an extra 30% speed up. The tryFSharp and ideone link codes have also been corrected. END_EDIT_ADD

The above program works at about O(n^1.1) performance with n the maximum prime calculated or about O(n^1.18) when n is the number of primes calculated, and takes about 2.16 seconds to calculate the first million primes (about 0.14 second for the first 100,000 primes) on a fast computer running 64 bit code using struct types rather than classes (it seems that some implementations box and unbox the by-value struct's in forming closures). I consider that to be about the maximum practical range for any of these pure functional prime algorithms. This is probably about the fastest that one can run a pure functional SoE algorithm other than for some minor tweaking to reduce constant factors.

Other than combining segmentation and multi-threading to share the computation between multiple CPU cores, most of the "tweaks" that could be made to this algorithm are in increasing the circumference of the wheel factorization for a gain of up to about 40% in performance and minor gains due to tweaks as to the use of structures, classes, tuples, or more direct individual parameters in the passing of state between functions.

EDIT_ADD2: I've done the above optimizations with the result that the code is now almost twice as fast due to structure optimizations with the added bonus of optionally using larger wheel factorization circumferences for the added smaller reduction. Note that the below code avoids using continuations in the main sequence generation loop and only uses them where necessary for the base primes streams and the subsequent composite number cull streams derived from those base primes. The new code is as follows:

type CIS<'T> = struct val v:'T val cont:unit->CIS<'T> new(v,cont) = { v=v;cont=cont } end //Co-Inductive Steam
let primesTFOWSE =
  let WHLPRMS = [| 2u;3u;5u;7u |] in let FSTPRM = 11u in let WHLCRC = int (WHLPRMS |> Seq.fold (*) 1u) >>> 1
  let WHLLMT = int (WHLPRMS |> Seq.fold (fun o n->o*(n-1u)) 1u) - 1
  let WHLPTRN =
    let wp = Array.zeroCreate (WHLLMT+1)
    let gaps (a:int[]) = let rec gap i acc = if a.[i]=0 then gap (i+1) (acc+1uy) else acc
                         {0..WHLCRC-1} |> Seq.fold (fun s i->
                           let ns = if a.[i]<>0 then wp.[s]<-2uy*gap (i+1) 1uy;(s+1) else s in ns) 0 |> ignore
    Array.init (WHLCRC+1) (fun i->if WHLPRMS |> Seq.forall (fun p->(FSTPRM+uint32(i<<<1))%p<>0u)
                                  then 1 else 0) |> gaps;wp
  let inline whladv i = if i < WHLLMT then i+1 else 0 in let inline advcnd c ci = c + uint32 WHLPTRN.[ci]
  let inline advmltpl p (c,ci) = (c + uint32 WHLPTRN.[ci] * p,whladv ci)
  let rec pmltpls p cs = CIS(cs,fun() -> pmltpls p (advmltpl p cs))
  let rec allmltpls k wi (ps:CIS<_>) =
    let nxt = advcnd k wi in let nxti = whladv wi
    if k < ps.v then allmltpls nxt nxti ps
    else CIS(pmltpls ps.v (ps.v*ps.v,wi),fun() -> allmltpls nxt nxti (ps.cont()))
  let rec (^) (xs:CIS<uint32*_>) (ys:CIS<uint32*_>) = 
    match compare (fst xs.v) (fst ys.v) with //union op for composite CIS's (tuple of cmpst and wheel ndx)
      | -1 -> CIS(xs.v,fun() -> xs.cont() ^ ys)
      | 0 -> CIS(xs.v,fun() -> xs.cont() ^ ys.cont())
      | _ -> CIS(ys.v,fun() -> xs ^ ys.cont()) //must be greater than
  let rec pairs (xs:CIS<CIS<_>>) =
    let ys = xs.cont() in CIS(CIS(xs.v.v,fun()->xs.v.cont()^ys.v),fun()->pairs (ys.cont()))
  let rec joinT3 (xs:CIS<CIS<_>>) = CIS(xs.v.v,fun()->
    let ys = xs.cont() in let zs = ys.cont() in (xs.v.cont()^(ys.v^zs.v))^joinT3 (pairs (zs.cont())))
  let rec mkprm (cnd,cndi,(csd:CIS<uint32*_>)) =
    let nxt = advcnd cnd cndi in let nxti = whladv cndi
    if cnd >= fst csd.v then mkprm (nxt,nxti,csd.cont()) //minus function
    else (cnd,cndi,(nxt,nxti