懒惰状态转换器在2D递归中急切地消耗懒惰列表

4
我正在使用状态转换器来在2D递归行走的每个点上随机采样数据集,该状态转换器输出一系列满足条件的2D样本网格列表。我想懒惰地从结果中获取数据,但我的方法会在我提取第一个结果之前在每个点耗尽整个数据集。
具体来说,考虑下面这个程序:
import Control.Monad ( sequence, liftM2 )
import Data.Functor.Identity
import Control.Monad.State.Lazy ( StateT(..), State(..), runState )

walk :: Int -> Int -> [State Int [Int]]
walk _ 0 = [return [0]]
walk 0 _ = [return [0]]
walk x y =
  let st :: [State Int Int]
      st = [StateT (\s -> Identity (s, s + 1)), undefined]
      unst :: [State Int Int] -- degenerate state tf
      unst = [return 1, undefined]
  in map (\m_z -> do
      z <- m_z
      fmap concat $ sequence [
          liftM2 (zipWith (\x y -> x + y + z)) a b -- for 1D: map (+z) <$> a
          | a <- walk x (y - 1) -- depth
          , b <- walk (x - 1) y -- breadth -- comment out for 1D
        ]
    ) st -- vs. unst

main :: IO ()
main = do
  std <- getStdGen
  putStrLn $ show $ head $ fst $ (`runState` 0) $ head $ walk 2 2

该程序沿着从(x, y)(0, 0)的矩形网格行走并求和所有结果,包括其中一个State monads列表的值:要么是读取并推进其状态的非平凡变压器st,要么是平凡变压器unst。有趣的是算法是否探索了stunst的头部之后。
在所提供的代码中,它会抛出undefined。我认为这是我的变换链接顺序设计不良的结果,特别是与状态处理有关,因为使用unst(即将结果与状态转换解耦)确实会产生结果。然而,我还发现1D递归即使使用状态变换也保留了惰性(删除广度步骤b < - walk...并将liftM2块替换为fmap)。
如果我们trace(show(x,y)),我们还可以看到它在触发前走完整个网格:
$ cabal run
Build profile: -w ghc-8.6.5 -O1
...
(2,2)
(2,1)
(1,2)
(1,1)
(1,1)
sandbox: Prelude.undefined

我怀疑我的使用sequence是有问题的,但选择单子和漫步的维度会影响其成功性,所以我不能广泛地说sequence转换本身就是严格性的原因。

在这里导致1D和2D递归严格性差异的是什么,并且我该如何实现我想要的惰性?

2个回答

2
考虑以下简化的例子:
import Control.Monad.State.Lazy

st :: [State Int Int]
st = [state (\s -> (s, s + 1)), undefined]

action1d = do
  a <- sequence st
  return $ map (2*) a

action2d = do
  a <- sequence st
  b <- sequence st
  return $ zipWith (+) a b

main :: IO ()
main = do
  print $ head $ evalState action1d 0
  print $ head $ evalState action2d 0

在1D和2D的计算中,结果的头部明确地仅取决于输入的头部(对于1D操作只有head a,对于2D操作既有head a也有head b)。然而,在2D计算中,b(即使只是它的头部)存在一种隐式依赖于当前状态,并且该状态取决于对a整体的评估,而不仅仅是它的头部。
在您的示例中也存在类似的依赖关系,尽管使用了状态操作列表来掩盖它。
假设我们想要手动运行操作walk22_head = head $ walk 2 2并检查结果列表中的第一个整数:
main = print $ head $ evalState walk22_head

明确编写状态操作列表st的元素:

st1, st2 :: State Int Int
st1 = state (\s -> (s, s+1))
st2 = undefined

我们可以将walk22_head写成:

walk22_head = do
  z <- st1
  a <- walk21_head
  b <- walk12_head
  return $ zipWith (\x y -> x + y + z) a b

请注意,这仅取决于定义的状态动作st1以及walk 2 1walk 1 2的头部。这些头部可以写成:
walk21_head = do
  z <- st1
  a <- return [0] -- walk20_head
  b <- walk11_head
  return $ zipWith (\x y -> x + y + z) a b

walk12_head = do
  z <- st1
  a <- walk11_head
  b <- return [0] -- walk02_head
  return $ zipWith (\x y -> x + y + z) a b

这些仅取决于已定义的状态动作st1walk 1 1的头部。

现在,让我们尝试写出walk11_head的定义:

walk11_head = do
  z <- st1
  a <- return [0]
  b <- return [0]
  return $ zipWith (\x y -> x + y + z) a b

这仅取决于定义的状态操作st1,因此如果我们按照这些定义运行main,我们将得到一个明确的答案:

> main
10

但是这些定义并不准确!在walk 1 2walk 2 1中,头部动作是一个序列的动作,从调用walk11_head开始,但继续执行基于walk11_tail的动作。因此,更准确的定义应该是:

walk21_head = do
  z <- st1
  a <- return [0] -- walk20_head
  b <- walk11_head
  _ <- walk11_tail  -- side effect of the sequennce
  return $ zipWith (\x y -> x + y + z) a b

walk12_head = do
  z <- st1
  a <- walk11_head
  b <- return [0] -- walk02_head
  _ <- walk11_tail  -- side effect of the sequence
  return $ zipWith (\x y -> x + y + z) a b

使用:

walk11_tail = do
  z <- undefined
  a <- return [0]
  b <- return [0]
  return [zipWith (\x y -> x + y + z) a b]

有了这些定义,单独运行walk12_headwalk21_head没有问题:

> head $ evalState walk12_head 0
1
> head $ evalState walk21_head 0
1

这里的状态副作用在计算答案时是不需要的,因此从未被调用。但是,无法按顺序运行它们两个:
> head $ evalState (walk12_head >> walk21_head) 0
*** Exception: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:78:14 in base:GHC.Err
  undefined, called at Lazy2D_2.hs:41:8 in main:Main

因此,试图运行main的原因相同,会失败:
> main
*** Exception: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:78:14 in base:GHC.Err
  undefined, called at Lazy2D_2.hs:41:8 in main:Main

因为在计算walk22_head时,甚至walk21_head的计算开始之初都依赖于状态副作用walk11_tail,而这个副作用是由walk12_head发起的。

你原始的walk定义的行为与这些模拟相同:

> head $ evalState (head $ walk 1 2) 0
1
> head $ evalState (head $ walk 2 1) 0
1
> head $ evalState (head (walk 1 2) >> head (walk 2 1)) 0
*** Exception: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:78:14 in base:GHC.Err
  undefined, called at Lazy2D_0.hs:15:49 in main:Main
> head $ evalState (head (walk 2 2)) 0
*** Exception: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:78:14 in base:GHC.Err
  undefined, called at Lazy2D_0.hs:15:49 in main:Main

很难说如何解决这个问题。你的玩具示例非常好,用于说明问题,但是在你的“真实”问题中如何使用状态并不清楚,以及head $ walk 2 1是否真正依赖于head $ walk 1 2引发的walk 1 1操作序列的状态依赖性。


非常好的观点。如果您感兴趣,我已经添加了一个答案来解释我的“真实”问题中的依赖关系。谢谢您为此带来的清晰度! - concat

1

K.A. Buhr的被接受的答案是正确的:在每个方向上获取一步的头部是可以的(尝试walk,其中x < 2y < 2),但liftM2中的隐式>>=,值a中的sequence以及值b中的状态依赖性使b取决于a的所有副作用。正如他所指出的那样,一个可行的解决方案取决于实际想要的依赖关系。

我将分享我的特定情况的解决方案:每个walk调用至少依赖于调用者的状态,也可能基于网格的前序遍历和st中的替代方案的一些其他状态。此外,正如问题所暗示的那样,我希望在测试任何不需要的st替代方案之前尝试生成完整结果。这有点难以在视觉上解释,但是以下是我所能做到的最好的:左侧显示每个坐标处st可选项的变量数量(这是我的实际用例),右侧显示状态所需的依赖顺序的[相当混乱的]地图:我们看到它首先在3D DFS中遍历x-y,其中“x”为深度(最快轴),“y”为广度(中间轴),然后最后替代方案作为最慢的轴(用带有开放圆圈的虚线表示)。

enter image description here

在原始实现中,核心问题来自于对状态转换列表的排序,以适应非递归返回类型。让我们完全用递归的单子参数类型替换列表类型,这样调用者可以更好地控制依赖顺序。
data ML m a = MCons a (MML m a) | MNil -- recursive monadic list
newtype MML m a = MML (m (ML m a)) -- base case wrapper

一个关于[1, 2]的例子:
MCons 1 (MML (return (MCons 2 (MML (return MNil)))))

Functor和Monoid行为经常被使用,因此这里提供相关的实现:

instance Functor m => Functor (ML m) where
  fmap f (MCons a m) = MCons (f a) (MML $ (fmap f) <$> coerce m)
  fmap _ MNil = MNil

instance Monad m => Semigroup (MML m a) where
  (MML l) <> (MML r) = MML $ l >>= mapper where
    mapper (MCons la lm) = return $ MCons la (lm <> (MML r))
    mapper MNil = r

instance Monad m => Monoid (MML m a) where
  mempty = MML (pure MNil)

有两个关键操作:合并两个不同轴上的步骤,以及在相同坐标处合并来自不同替代方案的列表。分别是:

  1. Based on the diagram, we want to get a single full result from the x step first, then a full result from the y step. Each step returns a list of results from all combinations of viable alternatives from inner coordinates, so we take a Cartesian product over both lists, also biased in one direction (in this case y fastest). First we define a "concatenation" that applies a base case wrapper MML at the end of a bare list ML:

    nest :: Functor m => MML m a -> ML m a -> ML m a
    nest ma (MCons a mb) = MCons a (MML $ nest ma <$> coerce mb)
    

    then a Cartesian product:

    prodML :: Monad m => (a -> a -> a) -> ML m a -> ML m a -> ML m a
    prodML f x (MCons ya ym) = (MML $ prodML f x <$> coerce ym) `nest` ((f ya) <$> x)
    prodML _ MNil _ = MNil
    
  2. We want to smash the lists from different alternatives into one list and we don't care that this introduces dependencies between alternatives. This is where we use mconcat from the Monoid instance.

总的来说,它看起来像这样:

walk :: Int -> Int -> MML (State Int) Int
-- base cases
walk _ 0 = MML $ return $ MCons 1 (MML $ return MNil)
walk 0 _ = walk 0 0

walk x y =
  let st :: [State Int Int]
      st = [StateT (\s -> Identity (s, s + 1)), undefined]
      xstep = coerce $ walk (x-1) y
      ystep = coerce $ walk x (y-1)
     -- point 2: smash lists with mconcat
  in mconcat $ map (\mz -> MML $ do
      z <- mz
                              -- point 1: product over results
      liftM2 ((fmap (z+) .) . prodML (+)) xstep ystep
    ) st

headML (MCons a _) = a
headML _ = undefined

main :: IO ()
main = putStrLn $ show $ headML $ fst $ (`runState` 0) $ (\(MML m) -> m) $ walk 2 2

注意结果已经随着语义的变化而改变。对我来说并不重要,因为我的目标只是从状态中提取随机数,并且任何需要的依赖关系顺序可以通过正确地将列表元素引导到最终结果来控制。
(我还要警告,如果没有备忘录或者注意严格性,这个实现对于大的x和y非常低效。)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接