Haskell是否提供立即评估IO单子的方法?

4

我目前正在使用Haskell编写一款光线追踪程序。由于我是Haskell的初学者,所以我不太清楚IO单子的求值策略。

问题在于“IO a”的长列表的内存使用情况,在我的代码中它是“IO Vec”。

列表的每个元素都由一个递归函数计算得到,该函数计算代表像素颜色的IO Vec。因此,列表的长度等于width x height

此外,我对每个像素进行了多次采样。总体而言,计算像素值的radiance函数被调用了width x height x samples次。

起初,我简单地使用列表推导来实现这个程序。代码类似于:

main = do
    ...
    let ray = (compute ray for every pair of [0..w-1], [0..h-1]
    pixels <- sequence [ (sumOfRadiance scene ray samples) | ray <- rays]

据我理解,在像素写入文件之前未被使用时,Haskell 会将一些函数调用的数据存储在 pixels 中,这是一个 IO Vec 数组。最终,通过调用递归函数 radiance 来计算像素值,内存消耗会增加。

如果我改变程序,使用 unsafePerformIO 逐个计算像素值,可以避免这种奇怪的内存空间使用。

main = do
    ...
    let ray = (compute ray for every pair of [0..w-1], [0..h-1]
    let pixels = [ (unsafePerformIO (sumOfRadiance scene ray samples)) | ray <- rays]

我知道 unsafePerformIO 是一个不好的解决方案,所以我想知道 Haskell 是否提供了另一种方法来立即在 IO monad 中进行评估。以下是我的整个代码(抱歉,它有点长...)

谢谢你的帮助。

-- Small path tracing with Haskell
import System.Environment
import System.Random.Mersenne
import System.IO.Unsafe
import Control.Monad
import Codec.Picture
import Data.Time
import qualified Data.Word as W
import qualified Data.Vector.Storable as V

-- Parameters
eps :: Double
eps = 1.0e-4

inf :: Double
inf = 1.0e20

nc :: Double
nc  = 1.0

nt :: Double
nt  = 1.5

-- Vec
data Vec = Vec (Double, Double, Double) deriving (Show)
instance (Num Vec) where
    (Vec (x, y, z)) + (Vec (a, b, c)) = Vec (x + a, y + b, z + c)
    (Vec (x, y, z)) - (Vec (a, b, c)) = Vec (x - a, y - b, z - c)
    (Vec (x, y, z)) * (Vec (a, b, c)) = Vec (x * a, y * b, z * c)
    abs = undefined
    signum = undefined
    fromInteger x = Vec (dx, dx, dx) where dx = fromIntegral x

x :: Vec -> Double
x (Vec (x, _, _)) = x

y :: Vec -> Double
y (Vec (_, y, _)) = y

z :: Vec -> Double
z (Vec (_, _, z)) = z

mul :: Vec -> Double -> Vec
mul (Vec (x, y, z)) s = Vec (x * s, y * s, z * s)

dot :: Vec -> Vec -> Double
dot (Vec (x, y, z)) (Vec (a, b, c))  = x * a + y * b + z * c

norm :: Vec -> Vec
norm (Vec (x, y, z)) = Vec (x * invnrm, y * invnrm, z * invnrm)
    where invnrm = 1 / sqrt (x * x + y * y + z * z)

cross :: Vec -> Vec -> Vec
cross (Vec (x, y, z)) (Vec (a, b, c)) = Vec (y * c - b * z, z * a - c * x, x * b - a * y)

-- Ray
data Ray = Ray (Vec, Vec) deriving (Show)

org :: Ray -> Vec
org (Ray (org, _)) = org

dir :: Ray -> Vec
dir (Ray (_, dir)) = dir

-- Material
data Refl = Diff
          | Spec
          | Refr
          deriving Show

-- Sphere
data Sphere = Sphere (Double, Vec, Vec, Vec, Refl) deriving (Show)

rad :: Sphere -> Double
rad  (Sphere (rad, _, _, _, _   )) = rad

pos :: Sphere -> Vec
pos  (Sphere (_  , p, _, _, _   )) = p

emit :: Sphere -> Vec
emit (Sphere (_  , _, e, _, _   )) = e

col :: Sphere -> Vec
col  (Sphere (_  , _, _, c, _   )) = c

refl :: Sphere -> Refl
refl (Sphere (_  , _, _, _, refl)) = refl

intersect :: Sphere -> Ray -> Double
intersect sp ray =
    let op  = (pos sp) - (org ray)
        b   = op `dot` (dir ray)
        det = b * b - (op `dot` op) + ((rad sp) ** 2)
    in
        if det < 0.0
            then inf
            else
                let sqdet = sqrt det
                    t1    = b - sqdet
                    t2    = b + sqdet
                in ansCheck t1 t2
                      where ansCheck t1 t2
                                | t1 > eps  = t1
                                | t2 > eps  = t2
                                | otherwise = inf

-- Scene
type Scene = [Sphere]
sph :: Scene
sph = [ Sphere (1e5,  Vec ( 1e5+1,  40.8, 81.6),    Vec (0.0, 0.0, 0.0), Vec (0.75, 0.25, 0.25),  Diff)   -- Left
      , Sphere (1e5,  Vec (-1e5+99, 40.8, 81.6),    Vec (0.0, 0.0, 0.0), Vec (0.25, 0.25, 0.75),  Diff)   -- Right
      , Sphere (1e5,  Vec (50.0, 40.8,  1e5),       Vec (0.0, 0.0, 0.0), Vec (0.75, 0.75, 0.75),  Diff)   -- Back
      , Sphere (1e5,  Vec (50.0, 40.8, -1e5+170),   Vec (0.0, 0.0, 0.0), Vec (0.0, 0.0, 0.0),     Diff)   -- Front
      , Sphere (1e5,  Vec (50, 1e5, 81.6),          Vec (0.0, 0.0, 0.0), Vec (0.75, 0.75, 0.75),  Diff)   -- Bottom
      , Sphere (1e5,  Vec (50,-1e5+81.6,81.6),      Vec (0.0, 0.0, 0.0), Vec (0.75, 0.75, 0.75),  Diff)   -- Top
      , Sphere (16.5, Vec (27, 16.5, 47),           Vec (0.0, 0.0, 0.0), Vec (1,1,1) `mul` 0.999, Spec)   -- Mirror
      , Sphere (16.5, Vec (73, 16.5, 78),           Vec (0.0, 0.0, 0.0), Vec (1,1,1) `mul` 0.999, Refr)   -- Glass
      , Sphere (600,  Vec (50, 681.6 - 0.27, 81.6), Vec (12, 12, 12),    Vec (0, 0, 0),           Diff) ] -- Light

-- Utility functions
clamp :: Double -> Double
clamp = (max 0.0) . (min 1.0)

isectWithScene :: Scene -> Ray -> (Double, Int)
isectWithScene scene ray = foldr1 (min) $ zip [ intersect sph ray | sph <- scene ] [0..]

nextDouble :: IO Double
nextDouble = randomIO

lambert :: Vec -> Double -> Double -> (Vec, Double)
lambert n r1 r2 =
    let th  = 2.0 * pi * r1
        r2s = sqrt r2
        w = n
        u = norm $ (if (abs (x w)) > eps then Vec (0, 1, 0) else Vec (1, 0, 0)) `cross` w
        v = w `cross` u
        uu = u `mul` ((cos th) * r2s)
        vv = v `mul` ((sin th) * r2s)
        ww = w `mul` (sqrt (1.0 - r2))
        rdir = norm (uu + vv + ww)
    in (rdir, 1)

reflect :: Vec -> Vec -> (Vec, Double)
reflect v n =
    let rdir = v - (n `mul` (2.0 * n `dot` v))
    in (rdir, 1)

refract :: Vec -> Vec -> Vec -> Double -> (Vec, Double)
refract v n orn rr =
    let (rdir, _) = reflect v orn
        into = (n `dot` orn) > 0
        nnt  = if into then (nc / nt) else (nt / nc)
        ddn  = v `dot` orn
        cos2t = 1.0 - nnt * nnt * (1.0 - ddn * ddn)
    in
        if cos2t < 0.0
            then (rdir, 1.0)
            else
                let tdir = norm $ ((v `mul` nnt) -) $ n `mul` ((if into then 1 else -1) * (ddn * nnt + (sqrt cos2t)))
                    a = nt - nc
                    b = nt + nc
                    r0 = (a * a) / (b * b)
                    c = 1.0 - (if into then -ddn else (tdir `dot` n))
                    re = r0 + (1 - r0) * (c ** 5)
                    tr = 1.0 - re
                    pp = 0.25 + 0.5 * re
                in
                    if rr < pp
                         then (rdir, (pp / re))
                         else (tdir, ((1.0 - pp) / tr))

radiance :: Scene -> Ray -> Int -> IO Vec
radiance scene ray depth = do
    let (t, i) = (isectWithScene scene ray)
    if inf <= t
        then return (Vec (0, 0, 0))
        else do
            r0 <- nextDouble
            r1 <- nextDouble
            r2 <- nextDouble
            let obj = (scene !! i)
            let c = col obj
            let prob = (max (x c) (max (y c) (z c)))
            if depth >= 5 && r0 >= prob
                then return (emit obj)
                else do
                    let rlt = if depth < 5 then 1 else prob
                    let f = (col obj)
                    let d = (dir ray)
                    let x = (org ray) + (d `mul` t)
                    let n = norm $ x - (pos obj)
                    let orn = if (d `dot` n) < 0.0  then n else (-n)
                    let (ndir, pdf) = case (refl obj) of
                            Diff -> (lambert orn r1 r2)
                            Spec -> (reflect d orn)
                            Refr -> (refract d n orn r1)
                    nextRad <- (radiance scene (Ray (x, ndir)) (succ depth))
                    return $ ((emit obj) + ((f * nextRad) `mul` (1.0 / (rlt * pdf))))

toByte :: Double -> W.Word8
toByte x = truncate (((clamp x) ** (1.0 / 2.2)) * 255.0) :: W.Word8

accumulateRadiance :: Scene -> Ray -> Int -> Int -> IO Vec
accumulateRadiance scene ray d m = do
    let rays = take m $ repeat ray
    pixels <- sequence [radiance scene r 0 | r <- rays]
    return $ (foldr1 (+) pixels) `mul` (1 / fromIntegral m)

main :: IO ()
main = do
    args <- getArgs
    let argc = length args
    let w   = if argc >= 1 then (read (args !! 0)) else 400 :: Int
    let h   = if argc >= 2 then (read (args !! 1)) else 300 :: Int
    let spp = if argc >= 3 then (read (args !! 2)) else 4   :: Int

    startTime <- getCurrentTime

    putStrLn "-- Smallpt.hs --"
    putStrLn $ "  width = " ++ (show w)
    putStrLn $ " height = " ++ (show h)
    putStrLn $ "    spp = " ++ (show spp)

    let dw = fromIntegral w :: Double
    let dh = fromIntegral h :: Double

    let cam = Ray (Vec (50, 52, 295.6), (norm $ Vec (0, -0.042612, -1)));
    let cx  = Vec (dw * 0.5135 / dh, 0.0, 0.0)
    let cy  = (norm $ cx `cross` (dir cam)) `mul` 0.5135
    let dirs = [ norm $ (dir cam) + (cy `mul` (y / dh  - 0.5)) + (cx `mul` (x / dw - 0.5)) | y <- [dh-1,dh-2..0], x <- [0..dw-1] ]
    let rays = [ Ray ((org cam) + (d `mul` 140.0), (norm d)) | d <- dirs ]

    let pixels = [ (unsafePerformIO (accumulateRadiance sph r 0 spp)) | r <- rays ]

    let pixelData = map toByte $! pixels `seq` (foldr (\col lst -> [(x col), (y col), (z col)] ++ lst) [] pixels)
    let pixelBytes = V.fromList pixelData :: V.Vector W.Word8
    let img = Image { imageHeight = h, imageWidth = w, imageData = pixelBytes } :: Image PixelRGB8
    writePng "image.png" img

    endTime <- getCurrentTime
    print $ diffUTCTime endTime startTime

9
光线追踪是一种纯计算,并不需要在IO单子中。您需要在开始时使用IO将场景和任何纹理等加载到内存中,并在最后使用IO写出图像,但实际渲染可以作为一个纯函数调用,它以场景为参数并返回渲染的图像。 - Wyzard
1
你可能最终想要使用未打包的可变向量在 ST(更普遍地,PrimMonad)中进行加速处理,但你可能不想从那里开始。 - dfeuer
6
看起来你正在使用 IO 来生成随机数。建议改用 MonadRandom - luqui
没关系,这并不能解决你的问题(尽管通常情况下,MonadRandom比IO更适合用于随机数生成)。 - luqui
1个回答

10
首先,我认为有一个错误。当你谈论从

转到
pixels <- sequence [ (sumOfRadiance scene ray samples) | ray <- rays]

to

pixels <- sequence [ (unsafePerformIO (sumOfRadiance scene ray samples)) | ray <- rays]

这没有意义。类型不应该匹配--只有当你组合一堆类型为m a的东西时,sequence才有意义。正确的做法是:

let pixels = [ unsafePerformIO (sumOfRadiance scene ray samples) | ray <- rays ]

我稍微放肆地假设这就是你所做的,并且你只是在输入问题时犯了一个错误。

如果是这种情况,那么你实际上正在寻找一种执行IO操作更加惰性的方法,而不是更加立即的方法。 sequence调用强制所有操作立即运行,而unsafePerformIO版本仅创建一个未运行操作的列表(实际上,列表本身是按需生成的,因此并不存在全部存在),并且操作会根据需要逐个运行其结果。

看起来您需要IO来生成随机数。 随机性可能有点麻烦-通常MonadRandom可以胜任此工作,但它仍然在操作之间创建顺序依赖关系,可能仍然不够惰性(我会尝试一下-如果您使用它,则可以获得可重现性-相同的种子会给出相同的结果,即使在遵守单子定律的重构后也是如此)。

如果MonadRandom无法正常工作,并且您需要以更多的按需方式生成随机数,则需要创建自己的随机性单子,该单子执行与您的unsafePerformIO解决方案相同的操作,但以适当封装的方式进行。 我将向您展示我认为是Haskell作弊方式的方法。 首先,是一个可爱的纯实现草图:

-- A seed tells you how to generate random numbers
data Seed = ...
splitSeed :: Seed -> (Seed, Seed)
random :: Seed -> Double

-- A Cloud is a probability distribution of a's, or an a which
-- depends on a random seed.  This monad is just as lazy as a
-- pure computation.
newtype Cloud a = Cloud { runCloud :: Seed -> a }
    deriving (Functor)

instance Monad Cloud where
    return = Cloud . const
    m >>= f = Cloud $ \seed ->
        let (seed1, seed2) = splitSeed seed in
        runCloud (f (runCloud m seed1)) seed2

我想我理解得没错。每次绑定时,您需要将种子分成两半,将一半传递给左侧,另一半传递给右侧。
现在这是完全纯粹的随机实现......但有几个限制。首先,没有非平凡的划分种子可以严格遵守单子法则;其次,即使我们允许违反规律,基于分割的随机数生成器也可能非常慢。但是,如果我们放弃确定性,只关心从分布中得到良好的采样而不是完全相同的结果,那么我们就不需要严格遵守单子法则。此时,我们作弊并假装有一个合适的“Seed”类型:
data Seed = Seed
splitSeed Seed = (Seed, Seed)

-- Always NOINLINE functions with unsafePerformIO to keep the 
-- optimizer from messing with you.
{-# NOINLINE random #-}
random Seed = unsafePerformIO randomIO

我们应该将这个功能封装在一个模块里,以保持抽象层次的清晰。不能暴露 CloudrunCloud,因为它们会导致我们违反纯度原则;只暴露必要的内容。
runCloudIO :: Cloud a -> IO a
runCloudIO = return . runCloud

这段代码不需要技术上的 IO,但是表明它不是确定性的。你可以在 Cloud monad 中构建任何需要的值,并在主程序中运行一次。

你可能会问为什么我们需要一个没有任何信息的 Seed 类型。我认为 splitSeed 只是对纯度的一种承认,实际上并没有做任何事情 -- 你可以将其删除 -- 但我们需要 Cloud 成为一个函数类型,以便惰性的隐式缓存不会破坏我们的语义。

let foo = random in liftM2 (,) foo foo

如果随机值确实与foo相关联,则始终返回具有两个相同组件的对。由于我们现在正在与优化器作斗争,所以我对这些事情并不确定,需要进行一些实验。

祝您愉快地欺骗。 :-)


有关此方法的更多信息,请阅读Lennart Augustsson,Mikael Rittri和Dan Synek的功能珍珠生成唯一名称 - atravers

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接