跨线程存储任意函数调用

4
我将尝试编写一个库,旨在复制Qt的线程语义:信号可以连接到槽,并且所有槽都在已知线程中执行,因此绑定到同一线程的槽与彼此相关是线程安全的。
我有以下API:
data Signal a = Signal Unique a
data Slot a = Slot Unique ThreadId (a -> IO ())

mkSignal :: IO (Signal a)
mkSlot   :: ThreadId -> (Slot a -> a -> IO ()) -> IO (Slot a)

connect :: Signal a -> Slot a -> IO ()

-- callable from any thread
emit :: Signal a -> a -> IO ()

-- runs in Slot's thread as a result of `emit`
execute :: Slot a -> a -> IO ()
execute (Slot _ _ f) arg = f arg

问题在于如何从emitexecute。需要以某种方式在运行时存储参数,然后执行IO操作,但是似乎无法通过类型检查器。
我需要的东西:
1.类型安全:信号不应连接到期望不同类型的插槽。 2.类型独立性:对于任何给定类型,可以有多个插槽(也许可以通过newtype和/或TH放宽这一点)。 3.易于使用:由于这是一个库,因此应易于创建信号和插槽。
我尝试过的方法:
1.Data.Dynamic:使整个过程非常脆弱,并且我没有找到一种在Dynamic上执行正确类型的IO操作的方法。有dynApply,但它是纯的。 2.存在类型:我需要执行传递给mkSlot的函数,而不是基于类型的任意函数。 3.Data.HList:我不够聪明,无法理解它。
我错过了什么?
1个回答

3
首先,你确定 Slots 真的想要在特定线程中执行吗?在 Haskell 中编写线程安全代码很容易,在 GHC 中线程非常轻量级,因此将所有事件处理程序执行与特定的 Haskell 线程绑定并没有太大的收益。
另外,mkSlot 的回调函数不需要给出 Slot 本身:你可以使用 递归 do-notation 在回调函数中绑定 slot,而不必添加将 knot 绑定到 mkSlot 的关注点。
无论如何,您不需要像那些解决方案一样复杂。我想当您谈论存在类型时,您正在考虑发送诸如(a -> IO (), a)这样的内容通过TChan(您在评论中提到过)并在另一端应用它,但您希望TChan接受此类型的值,而不仅仅是一个特定的a。这里的关键洞见是,如果您有(a -> IO (), a)并且不知道a是什么,您唯一能做的就是将函数应用于该值,从而为您提供一个IO() - 因此我们只需将这些内容通过通道发送即可!

以下是示例:

import Data.Unique
import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM

newtype SlotGroup = SlotGroup (IO () -> IO ())

data Signal a = Signal Unique (TVar [Slot a])
data Slot a = Slot Unique SlotGroup (a -> IO ())

-- When executed, this produces a function taking an IO action and returning
-- an IO action that writes that action to the internal TChan. The advantage
-- of this approach is that it's impossible for clients of newSlotGroup to
-- misuse the internals by reading the TChan or similar, and the interface is
-- kept abstract.
newSlotGroup :: IO SlotGroup
newSlotGroup = do
  chan <- newTChanIO
  _ <- forkIO . forever . join . atomically . readTChan $ chan
  return $ SlotGroup (atomically . writeTChan chan)

mkSignal :: IO (Signal a)
mkSignal = Signal <$> newUnique <*> newTVarIO []

mkSlot :: SlotGroup -> (a -> IO ()) -> IO (Slot a)
mkSlot group f = Slot <$> newUnique <*> pure group <*> pure f

connect :: Signal a -> Slot a -> IO ()
connect (Signal _ v) slot = atomically $ do
  slots <- readTVar v
  writeTVar v (slot:slots)

emit :: Signal a -> a -> IO ()
emit (Signal _ v) a = atomically (readTVar v) >>= mapM_ (`execute` a)

execute :: Slot a -> a -> IO ()
execute (Slot _ (SlotGroup send) f) a = send (f a)

这里使用了一个 TChan 来向每个槽位绑定的工作线程发送操作。
请注意,我对 Qt 不是非常熟悉,因此可能会忽略模型的一些微妙之处。您还可以使用以下方法断开槽位的连接:
disconnect :: Signal a -> Slot a -> IO ()
disconnect (Signal _ v) (Slot u _ _) = atomically $ do
  slots <- readTVar v
  writeTVar v $ filter keep slots
  where keep (Slot u' _) = u' /= u

如果这可能成为瓶颈,您可能需要像 Map Unique (Slot a) 这样的东西,而不是 [Slot a]
所以,解决方案是:(a)认识到您有一些基于可变状态的东西,并使用可变变量来构建它; (b)意识到函数和IO操作与其他所有内容一样是头等的,因此您无需在运行时进行任何特殊操作 :)
顺便说一下,我建议通过不从定义它们的模块导出它们的构造函数来保持对 Signal 和 Slot 的实现抽象化;毕竟,有许多方法可以采用这种方法而不更改API。

主要思想是在同一线程中执行的插槽在彼此之间是线程安全的。这与线程的类型无关。是的,我有自己的事件循环,它仅仅监听TChan并执行任何传入的内容。 - György Andrasek
虽然这并没有处理将现有插槽从一个组移动到另一个组的情况,但您可以通过在TVar中包装SlotGroup来轻松构建它。 - ehird
@GyörgyAndrasek,如果你希望在同一线程/组中的插槽彼此之间以原子方式运行,为什么不直接在STM中运行它们,或者在插槽组上使用MVar锁呢? - bdonlan
1
做得很好,将问题重新表述为关键思想,并勾勒出实现方案。:] - C. A. McCann
3
假定他们希望在插槽代码中引发副作用,就像事件处理程序经常这样做一样; GHC尚未实现所需的时间机器语义,以允许 instance MonadIO STM :) 对插槽组施加锁可能起作用,但可能不比专用线程读取 TChan 更简单。@C.A.McCann:谢谢! :) - ehird
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接