Haskell中的精确流控制

22
这个想法: 你好!我正在尝试基于数据流思想在Haskell中实现一个图像处理库。我遇到了一个关于如何处理控制流的问题。
主要的想法是引入一个“时间”变量。这个“时间”是一个可以在代码的任何地方访问的浮点数(你可以将其视为状态单子,但是更有趣)。有趣的是,我们可以对结果使用“timeShift”操作,影响相应操作所看到的时间。
最好的解释方法是通过一个例子来说明这种情况。让我们使用以下数据流图:
--               timeShift(*2) --
--              /                 \
-- readImage --                    addImages -> out
--              \                 /
--                blur ----------

它的伪代码(无需类型检查-在这里使用do或let符号并不重要,重要的是理解思想):
test = do
    f      <- frame
    a      <- readImage $ "test" + show f + ".jpg"
    aBlur  <- blur a
    a'     <- a.timeShift(*2)
    out    <- addImage aBlur a'

main = print =<< runStateT test 5

5是我们想要使用test函数的时间。 timeShift函数影响其左侧的所有操作(在数据流图中),在这种情况下,函数readImage将运行两次-对于两个分支都会运行 - 下面的分支将使用帧5,上面的分支将使用帧5 * 2 = 10

问题

我提供了一个非常简单的实现,它工作得很好,但有一些注意事项需要解决。问题是,我想保持所有IO操作的顺序。例如,看一下下面的示例,它将澄清我的意思。

示例实现

下面是算法的示例实现和构造以下数据流图的代码:

-- A --- blur --- timeShift(*2) --
--                                \
--                                 addImages -> out
--                                /
-- B --- blur --------------------

代码如下:
import Control.Monad.State

-- for simplicity, lets assume an Image is just a String
type Image = String

imagesStr = ["a0","b1","c2","d3","e4","f5","g6","h7","i8","j9","k10","l11","m12","n13","o14","p15","q16","r17","s18","t19","u20","v21","w22","x23","y24","z25"]
images = "abcdefghjiklmnoprstuwxyz"

--------------------------------
-- Ordinary Image processing functions

blurImg' :: Image -> Image
blurImg' img = "(blur " ++ img ++ ")"

addImage' :: Image -> Image -> Image
addImage' img1 img2 = "(add " ++ img1 ++ " " ++ img2 ++ ")"

--------------------------------
-- Functions processing Images in States

readImage1 :: StateT Int IO Image
readImage1 = do
    t <- get
    liftIO . putStrLn $ "[1] reading image with time: " ++ show t
    return $ imagesStr !! t

readImage2 :: StateT Int IO Image
readImage2 = do
    t <- get
    liftIO . putStrLn $ "[2] reading image with time: " ++ show t
    return $ imagesStr !! t

blurImg :: StateT Int IO Image -> StateT Int IO Image
blurImg img = do
    i <- img
    liftIO $ putStrLn "blurring"
    return $ blurImg' i

addImage :: StateT Int IO Image -> StateT Int IO Image -> StateT Int IO Image
addImage img1 img2 = do
    i1 <- img1
    i2 <- img2
    liftIO $ putStrLn "adding images"
    return $ addImage' i1 i2


timeShift :: StateT Int IO Image -> (Int -> Int) -> StateT Int IO Image
timeShift img f = do
    t <- get
    put (f t)
    i <- img
    put t
    return i

test = out where
    i1   = readImage1
    j1   = readImage2

    i2   = blurImg i1
    j2   = blurImg j1

    i3   = timeShift i2 (*2)
    out  = addImage i3 j2


main = do
    print =<< runStateT test 5
    print "end"

输出为:
[1] reading image with time: 10
blurring
[2] reading image with time: 5
blurring
adding images
("(add (blur k10) (blur f5))",5)
"end"

应该是:
[1] reading image with time: 10
[2] reading image with time: 5
blurring
blurring
adding images
("(add (blur k10) (blur f5))",5)
"end"

请注意,正确的输出应该是("(add (blur k10) (blur f5))",5),这意味着我们将第10帧的图像k10与第5帧的图像f5相加。
进一步要求:我正在寻找一种解决方案,允许用户编写简单的代码(如test函数中所示),但我不希望他们手动处理时间移位逻辑。
结论:唯一的区别是IO操作执行的顺序。我尝试使用CountinuationsArrows和一些有趣的状态来实现这个想法,但没有成功。

7
我认为您的问题未经详细说明。对于类似 a <- readImage "test.jpg"; a' <- timeShift (*(width a)); addImage a a' 或其他具有操作之间依赖关系的情况,您如何处理?对于您的需求,使用 Applicative 结构是否足够,而不必使用 Monad - Niklas B.
更准确地说,在这种情况下,无法在模糊操作之前加载c图像:a <- readImage "1"; b <- readImage "2"; c <- blur a; c <- timeShift b (*(mod (hash blur) 10)) - Niklas B.
7
这很像函数响应式编程 - Pedro Rodrigues
8
为什么您关心操作的顺序,尽管它们之间没有数据依赖关系? - dave
我的朋友创建了这个Haskell图像处理库,可能对你有用。 - Vivek
2个回答

3

在Haskell中,数据流和函数响应式编程库通常是用ApplicativeArrow编写的。这些都是计算抽象,比Monad不那么通用 - ApplicativeArrow类型类没有暴露出一种方式,使得计算结构依赖于其他计算的结果。因此,仅公开这些类型类的库可以独立于执行这些计算来推断库中计算结构。我们将使用Applicative类型类解决您的问题。

class Functor f => Applicative f where
    -- | Lift a value.
    pure :: a -> f a    
    -- | Sequential application.
    (<*>) :: f (a -> b) -> f a -> f b

Applicative 允许库用户使用 pure 创建新的计算,使用现有计算中的 fmap(来自 Functor),并使用 <*> 合成计算,使用一个计算的结果作为另一个计算的输入。它不允许库用户创建一个计算,使另一个计算直接使用该计算的结果;没有办法编写 join :: f (f a) -> f a。这种限制将避免我们的库遇到 我在其他答案中描述的问题

变换器、自由和 ApT 变换器

Your example problem is quite complex, so we will introduce several advanced Haskell techniques and create some new ones. The first two techniques are transformers and free data types. Transformers are types that take types with a kind similar to that of Functors, Applicatives or Monads and produce new types with the same kind.
Transformers usually have a structure similar to the following Double example. Double can take any Functor, Applicative, or Monad and create a version of it that always holds two values instead of one.
newtype Double f a = Double {runDouble :: f (a, a)}

自由数据类型是转换器,可以做两件事情。首先,给定底层类型的某些简单属性,为转换类型获得新的令人兴奋的属性。 Free Monad 提供了一个 Monad,给定任何 Functor,而自由 ApplicativeAp,则将任何 Functor 转换为一个 Applicative。 "free" 类型的另一件事是尽可能地"释放"解释器的实现。这里是自由 ApplicativeAp,自由 MonadFree 和自由单子变换器 FreeT 的类型。自由单子变换器提供了一个单子变换器,用于"free",给定一个 Functor

-- Free Applicative
data Ap f a where
    Pure :: a -> Ap f a
    Ap   :: f a -> Ap f (a -> b) -> Ap f b

-- Base functor of the free monad transformer
data FreeF f a b
    = Pure a    
    | Free (f b)    

-- Free monad transformer
newtype FreeT f m a = FreeT {runFreeT :: m (FreeF f a (FreeT f m a)}

-- The free monad is the free monad transformer applied to the Identity monad
type Free f = FreeT f Identity

这是我们的目标草图 - 我们想提供一个Applicative接口来组合计算,底层允许Monad计算。我们希望尽可能“释放”解释器,以便它可以重新排序计算。为此,我们将结合自由Applicative和自由单子变换器。
我们想要一个Applicative接口,最容易创建的是我们可以免费获得的接口,这与我们尽可能“释放解释器”的目标相吻合。这表明我们的类型将看起来像:
Ap f a

对于某些Functor f 和任何a,我们希望底层计算是在某个Monad上进行的,而Monad是一种functor,但我们希望尽可能地“释放”解释器。我们将自由monad transformer作为Ap的基础functor,从而得到
Ap (FreeT f m) a

对于某个Functor f,某个Monad m和任何a。我们知道Monad m可能会是IO,但我们将代码尽可能地保持通用。我们只需要为FreeT提供Functor即可。所有的Applicatives都是Functors,因此Ap本身可以用于f,我们可以编写如下内容:

type ApT m a = Ap (FreeT (ApT m) m) a

这会让编译器出现问题,所以我们将把Ap移到内部并进行定义。
newtype ApT m a = ApT {unApT :: FreeT (Ap (ApT m)) m a}

我们将为此推导一些实例,并在插曲之后讨论其真正的动机。
插曲
为了运行所有这些代码,您需要以下内容。仅需要MapControl.Concurrent来共享计算,稍后会更详细地介绍。
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Main where

import Control.Monad.Trans.Class
import Control.Monad.IO.Class

import Control.Monad.Trans.Reader
import Control.Applicative

import Control.Applicative.Free hiding (Pure)
import qualified Control.Applicative.Free as Ap (Ap(Pure))
import Control.Monad.Trans.Free

import qualified Data.Map as Map
import Control.Concurrent

填充它

在上一节中,我误导了你,并假装是通过推理问题来发现ApT。实际上,我是通过尝试各种方法来尝试将Monadic计算填充到一个Applicative中并能够控制其输出顺序而发现ApT的。很长一段时间,我一直在尝试解决如何实现mapApM(下面)以编写flipImage(我替换您的blur)。这就是所有荣耀的ApTMonad变换器。它旨在用作ApFunctor,并且通过使用Ap作为FreeT的自己的Functor,可以神奇地将值填充到不应该看起来可能的Applicative中。

newtype ApT m a = ApT {unApT :: FreeT (Ap (ApT m)) m a}
     deriving (Functor, Applicative, Monad, MonadIO)

它可以从FreeT中派生更多实例,但我们只需要这些。它无法派生MonadTrans,但我们可以自己完成:
instance MonadTrans ApT where
    lift = ApT . lift

runApT :: ApT m a -> m (FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a))
runApT = runFreeT . unApT

"

ApT的真正美妙之处在于我们可以编写一些看似不可能的代码,例如:

"
stuffM :: (Functor m, Monad m) => m (ApT m a) -> ApT m a
stuffMAp :: (Functor m, Monad m) => m (ApT m a) -> Ap (ApT m) a

外部的m消失了,甚至进入了仅为ApplicativeAp

这是因为以下函数循环,每个函数都可以将上面函数的输出塞入下面函数的输入中。第一个函数从ApT m a开始,最后一个函数以此结束。(这些定义不是程序的一部分)

liftAp' :: ApT m a ->
           Ap (ApT m) a
liftAp' = liftAp

fmapReturn :: (Monad m) =>
               Ap (ApT m) a ->
               Ap (ApT m) (FreeT (Ap (ApT m)) m a)
fmapReturn = fmap return

free' :: Ap (ApT m) (FreeT (Ap (ApT m)) m a) ->
         FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a)
free' = Free

pure' :: a ->
         FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a)
pure' = Pure

return' :: (Monad m) =>
           FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a) ->
           m (FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a))
return' = return

freeT :: m (FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a)) ->
         FreeT (Ap (ApT m)) m a
freeT = FreeT

apT :: FreeT (Ap (ApT m)) m a ->
       ApT m a
apT = ApT

这让我们可以编写

-- Get rid of an Ap by stuffing it into an ApT.
stuffAp :: (Monad m) => Ap (ApT m) a -> ApT m a
stuffAp = ApT . FreeT . return . Free . fmap return

-- Stuff ApT into Free
stuffApTFree :: (Monad m) => ApT m a -> FreeF (Ap (ApT m)) a (FreeT (Ap (ApT m)) m a)
stuffApTFree = Free . fmap return . liftAp

-- Get rid of an m by stuffing it into an ApT
stuffM :: (Functor m, Monad m) => m (ApT m a) -> ApT m a
stuffM = ApT . FreeT . fmap stuffApTFree

-- Get rid of an m by stuffing it into an Ap
stuffMAp :: (Functor m, Monad m) => m (ApT m a) -> Ap (ApT m) a
stuffMAp = liftAp . stuffM

还有一些用于处理转换器堆栈的实用函数

mapFreeT :: (Functor f, Functor m, Monad m) => (m a -> m b) -> FreeT f m a -> FreeT f m b
mapFreeT f fa = do
    a <- fa
    FreeT . fmap Pure . f . return $ a

mapApT :: (Functor m, Monad m) => (m a -> m b) -> ApT m a -> ApT m b
mapApT f = ApT . mapFreeT f . unApT

mapApM :: (Functor m, Monad m) => (m a -> m b) -> Ap (ApT m) a -> Ap (ApT m) b
mapApM f = liftAp . mapApT f . stuffAp

我们希望开始编写示例图像处理器,但首先我们需要进行另一个分支以解决一个硬性要求。

硬性要求 - 输入共享

您的第一个示例展示了

--               timeShift(*2) --
--              /                 \
-- readImage --                    addImages -> out
--              \                 /
--                blur ----------

意味着readImage的结果应该在blurtimeShift(*2)之间共享。我理解为readImage的结果每次只需要计算一次。 Applicative不足以满足这个要求。我们将创建一个新的类型类来表示输出可以分成多个相同流的计算。
-- The class of things where input can be shared and divided among multiple parts
class Applicative f => Divisible f where
    (<\>) :: (f a -> f b) -> f a -> f b

我们将创建一个转换器,将这种能力添加到现有的Applicative中。
-- A transformer that adds input sharing
data LetT f a where
    NoLet :: f a -> LetT f a
    Let   :: LetT f b -> (LetT f b -> LetT f a) -> LetT f a

并为其提供一些实用函数和实例。
-- A transformer that adds input sharing
data LetT f a where
    NoLet :: f a -> LetT f a
    Let   :: LetT f b -> (LetT f b -> LetT f a) -> LetT f a

liftLetT :: f a -> LetT f a
liftLetT = NoLet

mapLetT :: (f a -> f b) -> LetT f a -> LetT f b
mapLetT f = go
    where
        go (NoLet a) = NoLet (f a)
        go (Let b g) = Let b (go . g)

instance (Applicative f) => Functor (LetT f) where
    fmap f = mapLetT (fmap f)

-- I haven't checked that these obey the Applicative laws.
instance (Applicative f) => Applicative (LetT f) where
    pure = NoLet . pure
    NoLet f <*> a = mapLetT (f <*>) a
    Let c h <*> a = Let c ((<*> a) . h)   

instance (Applicative f) => Divisible (LetT f) where
    (<\>) = flip Let

图片处理器

当我们所有的转换器都就位后,我们可以开始编写我们的图片处理器。在我们的堆栈底部,我们有一个来自早期部分的 ApT

Ap (ApT IO)

计算需要能够从环境中读取时间,因此我们将添加一个ReaderT
ReaderT Int (Ap (ApT IO))

最后,我们希望能够共享计算,因此我们会在顶部添加我们的 LetT 转换器,为我们的图像处理器提供整个类型 IP
type Image = String
type IP = LetT (ReaderT Int (Ap (ApT IO)))

我们将从IO中读取图片。使用getLine可以制作有趣的交互式示例。
readImage :: Int -> IP Image
readImage n = liftLetT $ ReaderT (\t -> liftAp . liftIO $ do
    putStrLn $  "[" ++ show n ++ "] reading image for time: " ++ show t
    --getLine
    return $ "|image [" ++ show n ++ "] for time: " ++ show t ++ "|"
    )

我们可以将输入的时间偏移。
timeShift :: (Int -> Int) -> IP a -> IP a
timeShift f = mapLetT shift
    where
        shift (ReaderT g) = ReaderT (g . f)  

将多张图片合并在一起
addImages :: Applicative f => [f Image] -> f Image
addImages = foldl (liftA2 (++)) (pure [])

我想模拟使用某个被困在IO中的库来翻转图片。我无法弄清如何使一个字符串模糊...

inIO :: (IO a -> IO b) -> IP a -> IP b
inIO = mapLetT . mapReaderT . mapApM        

flipImage :: IP [a] -> IP [a]
flipImage = inIO flip'
    where
        flip' ma = do
            a <- ma
            putStrLn "flipping"
            return . reverse $ a

解释 LetT

我们的用于共享结果的 LetT 位于变换器栈的顶部。我们需要解释它以获得其下面的计算。为了解释 LetT,我们需要一种在 IO 中共享结果的方法,这就是 memoize 提供的功能,并且需要一个解释器将 LetT 变换器从栈顶移除。

为了共享计算结果,我们需要将它们存储在某个地方,这个过程在 IO 中使用 memoize 来实现,确保即使在多个线程中也只会发生一次。

memoize :: (Ord k) => (k -> IO a) -> IO (k -> IO a)
memoize definition = do
    cache <- newMVar Map.empty
    let populateCache k map = do
        case Map.lookup k map of
            Just a -> return (map, a)
            Nothing -> do
                a <- definition k
                return (Map.insert k a map, a)        
    let fromCache k = do
        map <- readMVar cache
        case Map.lookup k map of
            Just a -> return a
            Nothing -> modifyMVar cache (populateCache k)
    return fromCache

为了解释一个`Let`,我们需要一个评估器来将底层的`ApT IO`合并到`Let`绑定的定义中。由于计算结果取决于从`ReaderT`读取的环境,因此我们将在此步骤中处理`ReaderT`。更复杂的方法会使用变换器类,但是关于`Applicative`的变换器类是另一个问题的主题。
compileIP :: (forall x. ApT IO x -> IO x) ->  IP a -> IO (Int -> ApT IO a)
compileIP eval (NoLet (ReaderT f)) = return (stuffAp . f)
compileIP eval (Let b lf) = do
            cb <- compileIP eval b
            mb <- memoize (eval . cb)
            compileIP eval . lf . NoLet $ ReaderT (liftAp . lift . mb)

解释ApT

我们的解释器使用以下状态,避免需要一直查看AsTFreeTFreeF的内部。

data State m a where
    InPure :: a -> State m a
    InAp :: State m b -> State m (b -> State m a) -> State m a
    InM :: m a -> State m a

instance Functor m => Functor (State m) where
    fmap f (InPure a) = InPure (f a)
    fmap f (InAp b sa) = InAp b (fmap (fmap (fmap f)) sa)
    fmap f (InM  m)  = InM  (fmap f m)

解释Ap比看起来要困难。目的是将数据从Ap.Pure放入InPure,将数据从Ap放入InApinterpretAp需要每次进入更深层次的Ap时使用更大的类型调用自身;该函数不断获取另一个参数。第一个参数t提供了一种简化这些不断扩大的类型的方法。

interpretAp :: (Functor m) => (a -> State m b) -> Ap m a -> State m b
interpretAp t (Ap.Pure a) = t a
interpretAp t (Ap mb ap) = InAp sb sf
    where
        sb = InM mb
        sf = interpretAp (InPure . (t .)) $ ap

interperetApTApTFreeTFreeF中获取数据并将其放入State m中。

interpretApT :: (Functor m, Monad m) => ApT m a -> m (State (ApT m) a)
interpretApT = (fmap inAp) . runApT
    where
        inAp (Pure a) = InPure a
        inAp (Free ap) = interpretAp (InM . ApT) $ ap

通过这些简单的解释,我们可以制定解释结果的策略。每个策略都是从解释器的“State”到一个新的“State”的函数,可能会在执行过程中产生副作用。策略选择执行副作用的顺序决定了副作用的顺序。我们将制定两个示例策略。
第一个策略仅对准备计算的所有内容执行一步,并在准备好时合并结果。这可能是您想要的策略。
stepFB :: (Functor m, Monad m) => State (ApT m) a -> m (State (ApT m) a)
stepFB (InM ma)   = interpretApT ma
stepFB (InPure a) = return (InPure a)
stepFB (InAp b f) = do
    sf <- stepFB f
    sb <- stepFB b
    case (sf, sb) of
        (InPure f, InPure b) -> return (f b)
        otherwise            -> return (InAp sb sf)

这种策略会在获取计算信息后立即执行所有计算,并在单次处理中完成所有计算。
allFB :: (Functor m, Monad m) => State (ApT m) a -> m (State (ApT m) a)
allFB (InM ma) = interpretApT ma
allFB (InPure a) = return (InPure a)
allFB (InAp b f) = do
    sf <- allFB f
    sb <- allFB b
    case (sf, sb) of
        (InPure f, InPure b) -> return (f b)
        otherwise            -> allFB (InAp sb sf)

许多其他的策略都是可能的。 我们可以运行一个策略,直到它产生一个单一的结果来评估它。
untilPure :: (Monad m) => ((State f a) -> m (State f a)) -> State f a -> m a
untilPure s = go
    where
        go state =
            case state of
                (InPure a) -> return a
                otherwise  -> s state >>= go

执行解释器

要执行解释器,我们需要一些示例数据。以下是一些有趣的示例。

example1  = (\i -> addImages [timeShift (*2) i, flipImage i]) <\> readImage 1
example1' = (\i -> addImages [timeShift (*2) i, flipImage i, flipImage . timeShift (*2) $ i]) <\> readImage 1
example1'' = (\i -> readImage 2) <\> readImage 1
example2 = addImages [timeShift (*2) . flipImage $ readImage 1, flipImage $ readImage 2]

“LetT”解释器需要知道用于绑定值的求值器,因此我们将仅定义一次求值器。单个“interpretApT”通过查找解释器的初始“State”来启动评估过程。
evaluator :: ApT IO x -> IO x
evaluator = (>>= untilPure stepFB) . interpretApT

我们将编译名为example2的程序,这实际上就是您的示例,并运行它5个时间单位。
main = do
    f <- compileIP evaluator example2
    a <- evaluator . f $ 5
    print a

几乎可以得到所需的结果,所有读取操作在任何翻转之前完成。
[2] reading image for time: 5
[1] reading image for time: 10
flipping
flipping
"|01 :emit rof ]1[ egami||5 :emit rof ]2[ egami|"

1
一个Monad无法重新排列组成img1img2的步骤。
addImage :: (Monad m) => m [i] -> m [i] -> m [i]
addImage img1 img2 = do
    i1 <- img1
    i2 <- img2
    return $ i1 ++ i2

如果存在任何一个依赖于副作用的结果 m [i]。任何 MonadIO m 都有一个依赖于副作用的 m [i],因此您不能重新排序 img1img2 的组件步骤。
上述内容可转换为:
addImage :: (Monad m) => m [i] -> m [i] -> m [i]
addImage img1 img2 =
    img1 >>=
        (\i1 ->
            img2 >>=
                (\i2 ->
                    return (i1 ++ i2)
                )
        )

让我们聚焦于第一个>>=(记住(>>=) :: forall a b. m a -> (a -> m b) -> m b)。对于我们的类型,这是(>>=) :: m [i] -> ([i] -> m [i]) -> m [i]。如果我们要实现它,我们需要编写类似以下的代码:

(img1 :: m [i]) >>= (f :: [i] -> m [i]) = ... 

为了使用 f,我们需要向其传递一个 [i]。唯一正确的 [i] 被困在 img1 :: m [i] 中。我们需要获得 img1 的结果才能使用 f。现在有两种可能性。我们可以或者不能够在执行副作用之前确定 img1 的结果。接下来我们将分别探讨这两种情况。

不能

当我们无法在执行副作用之前确定 img1 的结果时,我们只有一个选择 - 我们必须执行 img1 以及它的所有副作用。我们现在有了一个 [i],但是所有 img1 的副作用已经被执行了。因为 img1 的副作用已经发生了,所以在执行 img2 的任何副作用之前,我们无法执行 img1 的任何副作用。

可以

如果我们能够确定img1的结果而不执行其副作用,那么我们就很幸运。我们找到img1的结果并将其传递给f,得到一个新的m [i]来保存我们想要的结果。现在我们可以检查img1和新的m [i]的副作用并重新排序它们(尽管这里有一个关于>>=的结合律的巨大警告)。
手头的问题
对于任何MonadIO,存在以下内容,其结果无法确定而不执行其副作用,将我们牢固地置于不能的情况下,我们无法重新排序副作用。
counterExample :: (MonadIO m) => m String
counterExample = liftIO getLine

还有许多其他反例,比如任何像readImage1readImage2这样的东西,实际上必须从IO中读取图像。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接