我希望处理对等体计时器的类可以接收并将消息放入并发队列中,以便在对等体离线时将消息放入队列。然后,“主”线程可以以事件驱动的方式轮询队列,并接收和处理消息。
请注意,这个“主”线程不应该是GUI框架的事件分配线程。如果在主线程接收到消息时需要更新GUI中的内容,则可以在事件分配线程上调用另一段代码。
队列的两个好选择是ConcurrentLinkedQueue(如果队列应该是无界的,计时器线程可以在主线程拾取它们之前将任意数量的消息放入队列中)或LinkedBlockingQueue(如果队列大小应该有限制,且如果队列过大,计时器线程必须等待才能再次放置消息。这称为反压缩,在分布式并发系统中可能非常重要,但在您的情况下可能不相关)。
这里的想法是实现Actor模型的一个版本(另请参见),其中没有任何线程之间共享的内容(角色),并且需要发送的任何数据(应为不可变)都会在它们之间传递。每个角色都有一个收件箱,可以在其中收到消息并对其进行操作。只要您的计时器线程在启动后就不需要从主线程接收任何消息,并且将所有数据作为构造函数的参数获取,它们可能不需要收件箱。
public record PeerDownMessage(String peerName, int errorCode) {
}
public class PeerWatcher {
private final Peer peer;
private final BlockingQueue<PeerDownMessage> queue;
public PeerWatcher(Peer peer, BlockingQueue<PeerDownMessage> queue) {
this.peer = Objects.requireNonNull(peer);
this.queue = Objects.requireNonNull(queue);
}
public void startTimer() {
queue.put(new PeerDownMessage(peer.getName(), error));
}
}
public class Main {
public void eventLoop(List<Peer> peers) {
LinkedBlockingQueue<PeerDownMessage> inbox =
new LinkedBlockingQueue<>();
for (Peer peer : peers) {
PeerWatcher watcher = new PeerWatcher(peer, inbox);
watcher.startTimer();
}
while (true) {
PeerDownMessage message = inbox.take();
SwingWorker.invokeLater(() {
JLabel label = labels.get(message.peerName());
label.setText(message.peerName() +
" failed with error " + message.errorCode());
});
}
}
}
请注意,为了更新GUI,我们会在另一个线程“Swing Event Dispatch Thread”上执行该操作,这个线程必须与我们的主线程不同。
有一些复杂的框架可用于实现Actor模型,但其中核心是:线程之间没有共享内容,因此您永远不需要同步或使任何东西易失性。所有Actor所需的内容都作为参数传递给其构造函数或通过其收件箱进行传递(在本例中,仅主线程具有收件箱,因为一旦启动工作线程,它们就不需要接收任何内容),并且最好使所有内容都是不可变的。我使用了record而不是class来表示消息,但您也可以使用常规的class。只需将字段设置为final,将其设置在构造函数中,并确保它们不能为null,如PeerWatcher类中所示。
我说过主线程可以轮询“队列”,这意味着可能会有多个队列,但在这种情况下,它们都发送相同类型的消息,并在消息正文中标识消息所属的对等方。因此,我为每个监视器提供了对主线程相同的收件箱的引用,这可能是最好的选择。Actor应该只有一个收件箱;如果需要执行多个操作,则可能需要多个Actor(这是Erlang的方法,也是我从中汲取灵感的地方)。
但是,如果您确实需要多个队列,则可以像下面这样轮询它们:
while (true) {
for (LinkedBlockingQueue<PeerDownMessage> queue : queues) {
if (queue.peek() != null) {
PeerDownMessage message = queue.take();
handleMessageHowever(message);
}
}
}
但是这些都是你不需要的额外东西。每个演员只使用一个收件箱队列,然后轮询收件箱以获取要处理的消息就很简单了。
我最初编写代码时使用了ConcurrentLinkedQueue,但使用了BlockingQueue中的put
和take
方法。我只是将其更改为使用LinkedBlockingQueue,但如果您更喜欢ConcurrentLinkedQueue,则可以使用add
和poll
,但是经过进一步考虑,我确实建议使用BlockingQueue,因为它的take()
方法非常简单; 它允许您在等待下一个可用项时轻松阻塞而不是繁忙等待。