邮箱处理器性能问题

11

我一直在尝试设计一个系统,允许大量并发用户同时在内存中表示。当开始设计这个系统时,我立即想到了一些像Erlang的基于Actor的解决方案。

由于这个系统必须在.NET上完成,所以我开始使用MailboxProcessor在F#中制作原型,但是遇到了严重的性能问题。我的最初想法是为每个用户使用一个Actor(MailboxProcessor)来序列化通信。

我已经分离出了一个小代码片段,用于复制我正在看到的问题:

open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv = 

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb = 

        Seq.initInfinite(fun id -> 
            MailboxProcessor<Message>.Start(fun inbox -> 

                let rec loop user =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | UpdateName(id, newName) ->
                            let user = {user with Name = newName};
                            incr.Increment()
                            do! loop user
                    }

                loop {Id = id; Name = sprintf "User%i" id}
            )
        ) 
        |> Seq.take 100000
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds
    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0

在我的四核i7上创建10万个演员大约需要800毫秒。然后将UpdateName消息提交给每个演员并等待他们完成需要约1.8秒。

现在,我意识到所有排队的ThreadPool,设置/重置AutoResetEvents等内部MailboxProcessor都有开销。但这真的是预期的性能吗?从阅读MSDN和各种关于MailboxProcessor的博客中,我得出的想法是它类似于erlang演员,但从我所看到的可怕性能来看,这似乎在现实中不成立?

我还尝试了修改版本的代码,它使用8个MailboxProcessors,每个MailboxProcessor都持有一个Map<int,User>映射,用于按ID查找用户,这带来了一些改进,将UpdateName操作的总时间降低到1.2秒。但它仍然感觉非常慢,修改后的代码在这里:

open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = CreateUser of int * string
    | UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv = 

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb = 

        Seq.initInfinite(fun id -> 
            MailboxProcessor<Message>.Start(fun inbox -> 

                let rec loop users =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | CreateUser(id, name) ->
                            do! loop (Map.add id {Id=id; Name=name} users)

                        | UpdateName(id, newName) ->
                            match Map.tryFind id users with
                            | None -> 
                                do! loop users

                            | Some(user) ->
                                incr.Increment()
                                do! loop (Map.add id {user with Name = newName} users)
                    }

                loop Map.empty
            )
        ) 
        |> Seq.take 8
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(CreateUser(i, sprintf "User%i-UpdateName" i));

    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0

那么我的问题是,我做错了什么吗?我是否误解了MailboxProcessor的使用方式?或者这种性能是预期的。

更新:

所以我联系了一些在irc.freenode.net上的##fsharp的人,他们告诉我使用sprintf非常慢,而事实证明这是我的性能问题的很大一部分原因。但是,删除上面的sprintf操作并只为每个用户使用相同的名称,我仍然需要大约400毫秒来执行操作,这感觉非常慢。


3
如果sprintf速度较慢 - 您可以尝试新的F# 3.1,据说它可以显著提高性能。 - John Palmer
2
在生产系统中,也许您更有可能按需为每个用户启动代理,这可能意味着启动时间不是一个很大的问题。同样,您是否期望所有用户同时发布消息? - Phillip Trelford
2
你可以通过使用可变字典来获得性能提升,而不是使用不可变的映射表。这样做是可以的,因为查找访问是通过代理同步的。 - Phillip Trelford
2
使用 Seq.initInfinite 然后构建数组所需的时间会比使用 Array.init 更长,这将成为您创建时间成本的一部分。 - Leaf Garland
2
如果你还没有这样做,你会发现在发布模式下性能更好。当对 .Net 代码进行基准测试时,你可能想通过在启动秒表之前运行单个循环迭代来排除初始的一次性 JIT 编译时间成本。 - Phillip Trelford
显示剩余3条评论
2个回答

19

现在,我意识到线程池上的所有队列操作、设置/重置自动重置事件等在MailboxProcessor内部会产生一些额外开销。

printfMapSeq和竞争全局可变的Inc。而且你泄漏了堆分配的栈帧。实际上,运行基准测试所需时间的只有很小一部分与MailboxProcessor有关。

但这真的是预期的性能吗?

我对程序的性能不感到惊讶,但它并不能说明MailboxProcessor的性能。

通过阅读MSDN和MailboxProcessor的各种博客,我得出了一个观点,那就是它应该类似于Erlang actors,但从我所看到的悲惨表现来看,在现实中似乎并非如此?

MailboxProcessor在概念上与Erlang的一部分有些相似。你所看到的性能低下是由于各种原因造成的,其中一些相当微妙,将影响任何这样的程序。

那么我的问题是,我做错了什么吗?

我认为你做了一些错误的事情。首先,你要解决的问题不清楚,这听起来像是一个XY问题。其次,你试图测试错误的东西(例如,你抱怨创建MailboxProcessor需要微秒级的时间,但可能只打算在建立几个数量级更长的TCP连接时才这样做)。第三,你编写了一个基准测试程序,测量了一些性能,但把你的观察结果归因于完全不同的事情。

让我们更详细地看看你的基准测试程序。在做任何其他操作之前,请修复一些错误。你应该始终使用sw.Elapsed.TotalSeconds来测量时间,因为它更精确。你应该始终使用return!在异步工作流中进行递归,而不是do!或者你将泄漏栈帧。

我的初始计时如下:

Creation stage: 0.858s
Post stage: 1.18s

接下来,让我们运行一次性能分析以确保我们的程序真正将大部分时间浪费在 F# MailboxProcessor 上:

77%    Microsoft.FSharp.Core.PrintfImpl.gprintf(...)
 4.4%  Microsoft.FSharp.Control.MailboxProcessor`1.Post(!0)

显然不是我们所期望的。更抽象地思考,我们正在使用像 sprintf 这样的东西生成大量数据,然后将其应用,但我们正在一起进行生成和应用。让我们分离出我们的初始化代码:

let ids = Array.init 100000 (fun id -> {Id = id; Name = sprintf "User%i" id})
...
    ids
    |> Array.map (fun id ->
        MailboxProcessor<Message>.Start(fun inbox -> 
...
            loop id
...
    printf "Create Time %fs\n" sw.Elapsed.TotalSeconds
    let fxs =
      [|for i in 0 .. 99999 ->
          mb.[i % mb.Length].Post, UpdateName(i, sprintf "User%i-UpdateName" i)|]
    incr.Start()
    for f, x in fxs do
      f x
...

现在我们得到:

Creation stage: 0.538s
Post stage: 0.265s

因此,创作速度提高了60%,发布速度提高了4.5倍。

让我们尝试完全重写您的基准测试:

do
  for nAgents in [1; 10; 100; 1000; 10000; 100000] do
    let timer = System.Diagnostics.Stopwatch.StartNew()
    use barrier = new System.Threading.Barrier(2)
    let nMsgs = 1000000 / nAgents
    let nAgentsFinished = ref 0
    let makeAgent _ =
      new MailboxProcessor<_>(fun inbox ->
        let rec loop n =
          async { let! () = inbox.Receive()
                  let n = n+1
                  if n=nMsgs then
                    let n = System.Threading.Interlocked.Increment nAgentsFinished
                    if n = nAgents then
                      barrier.SignalAndWait()
                  else
                    return! loop n }
        loop 0)
    let agents = Array.init nAgents makeAgent
    for agent in agents do
      agent.Start()
    printfn "%fs to create %d agents" timer.Elapsed.TotalSeconds nAgents
    timer.Restart()
    for _ in 1..nMsgs do
      for agent in agents do
        agent.Post()
    barrier.SignalAndWait()
    printfn "%fs to post %d msgs" timer.Elapsed.TotalSeconds (nMsgs * nAgents)
    timer.Restart()
    for agent in agents do
      use agent = agent
      ()
    printfn "%fs to dispose of %d agents\n" timer.Elapsed.TotalSeconds nAgents

此版本期望每个代理收到 nMsgs 个消息之后,该代理才会递增共享计数器,从而大大降低了该共享计数器的性能影响。该程序还检查了不同数量代理的性能。在这台机器上,我获得:

Agents  M msgs/s
     1    2.24
    10    6.67
   100    7.58
  1000    5.15
 10000    1.15
100000    0.36

看起来,导致你所见到的消息速度较慢的原因之一是代理数量过多(100,000)。使用10-1,000个代理时,F#实现比使用100,000个代理时快10倍以上。

如果您可以满足这种性能,请在F#中编写整个应用程序,但如果您需要更高的性能,则建议使用不同的方法。您甚至可以采用像Disruptor这样的设计而无需放弃使用F#(您当然可以用它进行原型设计)。实际上,我发现在.NET上执行序列化所花费的时间往往比在F#异步和MailboxProcessor中花费的时间要长得多。


2

在消除sprintf后,我得到了约12秒的结果(在Mac上的mono并不那么快)。 采用Phil Trelford建议的使用Dictionary而不是Map,运行时间缩短到了600毫秒。尚未在Win/.Net上进行过测试。

代码更改足够简单,并且本地可变性对我来说完全可以接受:

let mb = 
    Seq.initInfinite(fun id -> 
        MailboxProcessor<Message>.Start(fun inbox -> 
            let di = System.Collections.Generic.Dictionary<int,User>()
            let rec loop () =
                async {
                    let! m = inbox.Receive()

                    match m with
                    | CreateUser(id, name) ->
                        di.Add(id, {Id=id; Name=name})
                        return! loop ()

                    | UpdateName(id, newName) ->
                        match di.TryGetValue id with
                        | false, _ -> 
                            return! loop ()

                        | true, user ->
                            incr.Increment()
                            di.[id] <- {user with Name = newName}
                            return! loop ()
                }

            loop ()
        )
    ) 
    |> Seq.take 8
    |> Array.ofSeq

4
在异步工作流中,必须使用return!进行尾递归,而不是使用do!,否则将会泄漏(堆分配的)栈帧! - J D

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接