如何使用Applicative进行并发编程?

6
这是我之前提出的问题的跟进。下面的示例来自Haxl
假设我从博客服务器获取数据以呈现博客页面,其中包含最近的文章、热门文章和文章主题。
我有以下数据获取API:
val getRecent  : Server => Seq[Post] = ...
val getPopular : Server => Seq[Post] = ...
val getTopics  : Server => Seq[Topic] = ...

现在我需要将它们组合起来以实现一个新的功能getPageData
val getPageData: Server => (Seq[Post],  Seq[Post], Seq[Topic])

Haxl 建议使用一个新的 monad Fetch 来使 API 具有可组合性。

val getRecent  : Fetch[Seq[Posts]] = ...
val getPopular : Fetch[Seq[Posts]] = ...
val getTopics  : Fetch[Seq[Topic]] = ...

现在我可以使用单子组合来定义我的getPageData: Fetch[A]
val getPageData = for {
  recent  <- getRecent
  popular <- getPopular
  topics  <- getTopics
} yield (recent, popular, topics)

但它不会同时运行getRecentgetPopulargetTopics

Haxl建议使用applicative组合<*>来组合“并发”函数(即可以同时运行的函数)。所以我的问题是:

  • 如何实现getPageData,假设Fetch[A]是一个Applicative
  • 如何将Fetch实现为Applicative而不是Monad
1个回答

4
如何实现getPageData假设Fetch[A]是一个Applicative?
我们需要放弃单子绑定运算符“>>=”,转而使用应用运算符“<*>”。因此,代码应该改为:
val getPageData = for {
  recent  <- getRecent
  popular <- getPopular
  topics  <- getTopics
} yield (recent, popular, topics)

我们可以这样编写代码(使用Haskell语法,抱歉,我头脑中无法做到Scala):

getPageData = makeTriple <$> getRecent <*> getPopular <*> getTopics
  where
    makeTriple x y z = (x, y, z)

但这是否能达到预期效果,取决于第二个问题!

如何实现Fetch作为应用程序,而不是Monad?

单子和应用序列之间的关键区别在于单子可以依赖于单子值内部的值,而应用符号 <*> 不行。请注意上面对于 getPageData 的单子表达式在到达 getTopics 之前绑定了名为 recentpopular 的名称。这些名称可以被用来改变表达式结构,例如,在 recent 为空的情况下获取其他数据源。但是,使用应用表达式时, getRecentgetPopular 的结果并不影响表达式本身的结构。这个属性允许我们同时启动应用表达式中的每一个项,因为我们静态地知道表达式的结构。

因此,根据上述观察以及Fetch数据类型的特定形状,我们可以提出一个合适的 <*> 定义。我认为以下概括了一般思路:

data Fetch a = Fetch { runFetch :: IO a }

fetchF <*> fetchX = Fetch $ do
  -- Fire off both IOs concurrently.
  resultF <- async $ runFetch fetchF
  resultX <- async $ runFetch fetchX
  -- Wait for both results to be ready.
  f <- wait resultF
  x <- wait resultX
  return $ f x

为了比较,假设我们尝试使用并发评估来进行单子绑定:

fetchF >>= fetchK = Fetch $ do
  resultF <- async $ runFetch fetchF
  -- Oh no, we need resultF in order to produce the next
  -- Fetch value! We just have to wait...
  f <- wait resultF
  fetchX <- async $ runFetch (fetchK f)
  x <- wait $ runFetch fetchX
  return $ f x

非常感谢。我不懂 Haskell,所以才会问这个问题……你是将 Fetch[A] 定义为一个记录,其中包含类型为 IO[A]runFetch 字段吗? - Michael
没错,Fetch[A] 有一个记录 runFetch 的 IO[A] 运行。因此,Fetch[A] 实际上只是 IO[A] 的一个新名称,允许我们给出新的 >>=<*> 定义。 - Alexander Vieth
再次感谢。我想我明白了。顺便说一下,HaxlFetch的定义有所不同,我会再读一遍,可能会在这里问更多问题。 - Michael
没问题!是的,我的Fetch定义只是为了说明在应用程序中获取并发性背后的一般思想,而在单子中不可能实现。我相信Haxl的fetch更加复杂,因为它处理缓存。 - Alexander Vieth
Haxl的“Fetch”只是一棵树;<*>只是添加一个新节点。他们还有一个单独的“fetch”函数,它_解释_“Fetch”树并实际获取数据。fetch处理所有缓存、批处理、异步等操作。 - Michael

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接