减少Haskell程序中垃圾回收暂停时间

139
我们正在开发一个程序,可以接收和转发“消息”,同时保留这些消息的临时历史记录,以便在请求时提供消息历史记录。消息使用数字进行标识,通常大小约为1千字节,我们需要保存数十万条这样的消息。
我们希望将此程序优化为延迟时间:发送和接收消息之间的时间必须低于10毫秒。
该程序是用Haskell编写并使用GHC编译的。但是,我们发现在实际程序中,垃圾回收暂停时间太长,超过了我们的延迟要求:超过100毫秒。
以下程序是我们应用程序的简化版本。它使用Data.Map.Strict来存储消息。消息是由Int标识的ByteString。插入100万条递增数字顺序的消息,并不断删除最旧的消息,以使历史记录最多达到20万条。
module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

我们使用以下方式进行编译和运行该程序:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed

重要的指标在于 "最大暂停时间" 为0.0515秒,即51毫秒。我们希望将其至少减小一个数量级。

实验表明GC暂停的长度取决于历史记录中的消息数量。这种关系大致是线性的,或者可能是超线性的。下表显示了这种关系。(您可以在此处查看我们的基准测试,以及一些图表。)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378

我们已经尝试了其他几个变量,以确定它们是否可以减少这种延迟,但没有一个能够产生重大影响。这些不重要的变量包括:优化 (-O, -O2)、RTS GC 选项 (-G, -H, -A, -c)、核心数 (-N)、不同的数据结构 (Data.Sequence)、消息大小和短生命周期垃圾的数量。压倒性的决定性因素是历史消息的数量。

我们的工作理论是,在消息数量上暂停时间是线性的,因为每个GC周期都必须遍历所有工作可访问内存并复制它,这显然是线性操作。

  • 这个线性时间理论正确吗?GC 暂停的长度可以用这种简单的方式来表达,还是现实更加复杂?
  • 如果 GC 暂停在工作内存中是线性的,有没有办法减少与之相关的常数因子?
  • 是否有增量 GC 或类似方法的选项?我们只能看到研究论文。我们非常愿意为较低的延迟而换取吞吐量。
  • 除了分成多个进程之外,有没有将内存“分区”为更小的 GC 周期的方法?

1
@Bakuriu:没错,但在任何现代操作系统中都应该能够在不需要任何调整的情况下轻松地实现10毫秒的延迟。当我运行简单的C程序时,即使在我的老树莓派上,它们也可以轻松地达到5毫秒左右的延迟,或者至少是可靠的15毫秒。 - leftaroundabout
4
你是否有信心你的测试用例是有用的(比如你没有使用Control.Concurrent.Chan这样的可变对象来影响结果)?我建议首先确保你知道你正在生成什么垃圾,并尽可能减少它(例如,确保融合发生,尝试使用-funbox-strict)。也许尝试使用流处理库(iostreams、pipes、conduit、streaming),并在更频繁的间隔直接调用performGC - jberryman
9
如果你想要实现的目标可以在恒定空间内完成,那么首先尝试让它发生(例如,使用MutableByteArray中的环形缓冲区;在这种情况下完全不涉及GC)。 - jberryman
2
对于那些建议使用可变结构并注意创建最小垃圾的人,请注意,决定暂停时间的是保留大小,而不是收集的垃圾量。强制更频繁的收集会导致大约相同长度的更多暂停。编辑:可变的非堆结构可能很有趣,但在许多情况下工作起来并不那么有趣! - mike
6
这段描述明确表明,对于所有代,GC时间都将与堆的大小成线性关系,重要因素包括保留对象的大小(用于复制)和指向它们的指针数量(用于扫描):https://ghc.haskell.org/trac/ghc/wiki/Commentary/Rts/Storage/GC/Copying - mike
显示剩余9条评论
5个回答

102

您能够在200MB的实时数据中拥有51ms的暂停时间已经做得非常好了。 我工作的系统在半量实时数据下具有更大的最大暂停时间。

您的假设是正确的,主要GC暂停时间与实时数据量成正比,不幸的是,在目前的GHC中无法避免这一点。 我们过去尝试过增量GC,但它只是一个研究项目,并没有达到折叠到发布的GHC所需的成熟程度。

我们希望未来一件事情可以帮助解决这个问题:紧凑区域:https://phabricator.haskell.org/D1264。 这是一种手动内存管理,其中您可以压缩堆中的结构,GC就不必遍历它。 它对于长期数据效果最好,但也许对于您的情况中的单个消息使用起来也足够好。 我们的目标是将其放入GHC 8.2.0中。

如果您在分布式环境中并且拥有某种负载均衡器,则可以玩弄一些技巧以避免承受暂停的影响,您基本上需要确保负载均衡器不会将请求发送到即将进行主要GC的机器,并确保该机器仍然完成GC,即使它没有收到请求。


13
嗨,西蒙,非常感谢您详细的回复!这是个坏消息,但很高兴有了结论。我们目前正在朝着可变实现成为唯一合适的替代方案的方向发展。有几件事情我们不明白:(1)负载平衡方案中涉及到哪些技巧 - 是否需要手动执行 performGC?(2)为什么使用 -c 进行压缩会表现更差 - 我们猜测是因为它找不到可以原地保留的东西?(3)关于压缩还有更多细节吗?听起来非常有趣,但不幸的是对我们来说还太遥远了。 - jameshfisher
3
@mljrg,你可能会对http://www.well-typed.com/blog/2019/10/nonmoving-gc-merge/感兴趣。该文章介绍了非移动垃圾回收技术的实现和优化。 - Alfredo Di Napoli

13

如其他答案所述,GHC中的垃圾收集器遍历活跃数据,这意味着您存储在内存中的长期数据越多,GC暂停时间就越长。

GHC 8.2

为了部分解决这个问题,在 GHC-8.2 中引入了一项名为compact regions的功能。它既是 GHC 运行时系统的功能,又是一个提供方便接口的库。紧凑区域能将数据放入内存中的单独位置,并且 GC 在垃圾回收阶段不会遍历该数据。因此,如果您有一个要保留在内存中的大型结构,请考虑使用紧凑区域。但是,紧凑区域本身没有内置小型垃圾收集器,它更适用于只追加数据结构,而不是像HashMap那样还需要删除数据的情况。虽然您可以克服这个问题。有关详细信息,请参见以下博客文章:

GHC 8.10

此外,自GHC-8.10以来,实现了一种新的低延迟增量垃圾收集器算法。这是一种可选的GC算法,不是默认启用的,但如果需要,可以选择加入它。因此,您可以将默认GC切换到更新的GC,以自动获得由紧凑区域提供的功能,而无需进行手动包装和解包。然而,新的GC并非万能药,并不能自动解决所有问题,它也有其权衡。有关新GC的基准,请参考以下GitHub存储库:

10

我尝试了您的代码片段,并使用 IOVector 作为基础数据结构来实现环形缓冲区。在我的系统上(GHC 7.10.3,使用相同的编译选项),这导致最大时间(您在原帖中提到的度量)减少了约22%。

注:我做出了两个假设:

  1. 可变数据结构适用于此问题(我猜消息传递无论如何都涉及IO)
  2. 您的 messageId 是连续的

通过一些额外的 Int 参数和算术运算(例如当 messageId 重置回0或 minBound 时),然后可以轻松确定某个消息是否仍在历史记录中,并从环形缓冲区中检索相应的索引。

供您测试使用:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])

2
你好!回答得很好。我怀疑这只能获得22%的加速是因为GC仍然需要遍历IOVector和每个索引处的(不可变的,GC'd)值。我们目前正在调查使用可变结构重新实现的选项。它可能类似于您的环形缓冲系统。但我们将其完全移出Haskell内存空间以进行自己的手动内存管理。 - jameshfisher
13
我曾经遇到一个类似的问题,但决定在Haskell端保留内存管理。解决方案确实是环形缓冲区,它将原始数据的字节拷贝保存在单个连续的内存块中,从而生成单个Haskell值。你可以查看这个RingBuffer.hs gist。我已经测试过它并与你的示例代码进行了比较,得到了约90%的关键指标加速。随时可以方便地使用这段代码。 - mgmeier

9

我必须同意其他人的观点 - 如果你有严格的实时约束,那么使用垃圾回收语言并不理想。

然而,你可以考虑尝试其他可用的数据结构,而不仅仅是Data.Map。

我使用Data.Sequence重新编写了它,并获得了一些有希望的改进:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350

尽管您正在优化延迟,但我注意到其他指标也有所改善。在200000的情况下,执行时间从1.5秒降至0.2秒,总内存使用量从600MB降至27MB。
需要注意的是,我通过调整设计进行了作弊:
- 我从Msg中删除了Int,因此它不在两个位置。 - 我使用ByteStringSequence,而不是将Int映射为ByteString的Map,并且每个消息不再使用一个Int,而是可以使用一个整数表示整个Sequence。假设消息不能被重新排序,您可以使用单个偏移量将要查询的消息翻译到队列中的位置。
(我包含了另一个函数getMsg来演示这一点。)
{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])

4
嗨!谢谢你的答复。你的结果确实仍然显示线性减速,但是从“Data.Sequence”获得如此大的加速效果相当有趣 - 我们测试了它,并发现它实际上比“Data.Map”更糟糕!我不确定其中的区别,所以我需要进行调查... - jameshfisher

3

您发现了使用垃圾回收语言的局限性:它们不适用于强实时系统。

您有两个选择:

第一,增加堆大小并使用二级缓存系统,将最旧的消息发送到磁盘上,将最新的消息保留在内存中,可以通过使用操作系统分页来实现。但是,这种解决方案的问题是,根据所使用的二级存储器的读取能力,分页可能会很昂贵。

第二,使用C编写该解决方案,并将其与FFI接口到Haskell中。这样,您就可以自己进行内存管理。这将是最好的选择,因为您可以自己控制所需的内存。


1
嗨,费尔南多。谢谢你的回复。我们的系统只是“软”实时系统,但在我们的情况下,我们发现垃圾回收对于软实时系统来说过于严苛了。我们肯定倾向于采用你提出的第二种解决方案。 - jameshfisher

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接