如何在ExecutorService的执行器之间进行同步

3
我有一个客户端套接字列表,通常大小约为2000。这些客户端是动态的,会进出。
我有一个带有32个线程的固定线程池的ExecutorService处理这些线程。此执行器服务负责解码和发送要发送到这2000个客户端的消息。
我想防止执行器服务的两个或多个线程同时处理同一个客户端。
一种方法是引入另一个记账线程(因此最终拥有32 + 1个线程),该线程负责在上一个相应于同一客户端的消息完成时调用ExecutorService.submit(message) 。但我不确定这是否会引入瓶颈,这意味着这个新引入的记账线程可能无法跟上提交消息的速度。
理想情况下,我不想预先分配线程给一组客户端,因为消息负载在客户端之间不均匀分布。也事先不知道。
有哪些方法可以解决这个问题?它们由java.util.concurrent功能提供吗?
更新
如评论所指出的,这是一个快速的摘要:
我不想每个客户端都有一个单独的线程,因为这样会有2000个线程。
理想情况下,我不想预先分配线程给一组客户端,因为消息速率在所有客户端之间不均匀分布,并且事先不知道。
消息顺序必须得到保留。
我认为,如果线程A正在等待线程B,因为B已经向同一客户端发送消息,那么这样做是不好的。换句话说,在任何时候,仅有一个线程正在处理一个客户端。

1
我想防止执行器服务的两个(或更多)线程同时处理同一个客户端。你不理解这个需求吗?有没有可能同时为同一个客户端处理2个工作?为什么这是个问题? - zapl
我认为他想要对来自同一来源的处理请求进行序列化。不知道为什么。 - Antoniossss
使用并发队列。如果有必要,队列也可以被排序。 - Antoniossss
使用队列,让所有线程充当消费者,并从中获取下一个对象(此时,如果没有客户端可用,则可以使线程等待数据可用)。这样,您就不需要为执行器提交任务并保持所有线程全时运行。 - Marcos Vasconcelos
@MarcosVasconcelos 那么我们如何保留消息的顺序呢?可能会发生线程AB都获取了消息12,但由于线程调度,消息2将先被发送。 - Isaac Vero
显示剩余13条评论
3个回答

1
当线程(A)开始处理消息(#1)时,它需要向共享管理对象注册客户端ID。对于每个注册的客户端,都有一个队列。
当另一个线程(B)开始为同一客户端处理消息(#2)时,注册将检测到线程A已经在处理,并将消息#2添加到客户端的队列中。然后,线程B将停止并处理下一条消息。
当线程A完成消息#1时,它将尝试注销,但由于消息#2是队列,因此线程A将开始处理该消息。之后,当它再次尝试注销时,没有排队的消息,线程将停止并处理下一条消息。
正确同步访问取决于管理器代码,因此第二个消息要么由线程B处理,要么移交给线程A,而不会丢失。
上述逻辑确保线程B不会等待线程A,即没有空闲时间,并且消息#2尽可能快地处理,即最小延迟,而不会同时处理两个相同客户端的消息。 对于每个客户端,消息顺序保留。全局而言,消息顺序当然不被保留,因为消息#2的处理被延迟了。
注意,每个线程只有一个队列,因此只有32个队列,并且只有“重复”的消息才会进入队列,因此所有队列通常都是空的。

更新

示例:为了在此进行标识,消息被命名为 clientId.messageId,其中messageId是全局的。

消息按以下顺序提交到执行器(3个线程):

1.1、2.2、1.3、2.4、3.5、1.6

  1. 线程A获取了1.1并开始处理。

  2. 线程B获取了2.2并开始处理。

  3. 线程C获取了1.3,将其添加到线程A的队列中,然后返回。

  4. 线程C获取了2.4,将其添加到线程B的队列中,然后返回。

  5. 线程C获取了3.5并开始处理。

  6. 线程A完成了消息1.1并开始处理1.3

  7. 线程C完成了消息3.5并返回。

  8. 线程C获取了1.6,将其添加到线程A的队列中,然后返回。
    线程C现在处于空闲状态。

  9. 线程B完成了消息2.2并开始处理2.4

  10. 线程A完成了消息1.3并开始处理1.6

  11. 线程B完成了消息2.4并返回。
    线程B现在处于空闲状态。

  12. 线程A完成了消息1.6并返回。
    线程A现在处于空闲状态。


我认为这种方法有一个缺点:如果消息速率很高,我们可能会陷入这样一种情况,即32个线程一遍又一遍地优先处理同样的32个客户端(因为每个消息之后都有另一个消息准备好发送),这意味着其他2000-32个客户端实际上处于饥饿状态,如果这有意义的话。 - Isaac Vero
@IsaacVero 这就是应该的方式,因为原始的全局消息顺序表明这些消息应该已经被处理了,所以它们具有优先权。如果客户端无法跟上(即客户端的注册队列中有太多的消息),则可以添加额外的逻辑来开始丢弃消息,但如果需要,OP需要定义规则。 - Andreas
让我重新表述一下,也许我误解了。线程 A 正在向客户端 1 发送消息。在此期间,添加了两条消息:一条是给客户端 2 的消息,另一条是给客户端 1 的消息。如果我理解正确的话,你的方法是线程 A 在处理完第一条消息后立即处理第二条消息,而不是为客户端 2 处理消息。现在假设有高消息速率。我认为这意味着我们一遍又一遍地优先考虑相同的客户端,并使其他客户端饥饿? - Isaac Vero
此外,我了解到有32个线程和32个队列。那么每个线程都在执行 while(true) { process(queue.take()); } 吗?那么是谁决定将新消息添加到哪个队列中呢? - Isaac Vero
好的。谢谢您的解释,引入队列来处理重复消息的想法很聪明。 - Isaac Vero
显示剩余5条评论

0

让每个线程服务自己的队列。给套接字编号。将每个请求放在queue[socket num % num of threads]上。

这将确保来自特定套接字的请求按系列和顺序处理。

不幸的是,您无法通过这种方式获得负载平衡。

或者,使用ConcurrentHashMap存储正在服务的套接字。如果线程服务当前正在处理的套接字的请求,则将请求放回队列。


为什么要点踩,您能解释一下为什么这个答案是不正确的吗? - user1373164
没有为每个客户端创建线程,消息顺序保持不变,也没有显式的阻塞。 - user1373164
我没有给你的问题点踩(我没有足够的声望),但是我很难理解你在最后一句话中的意思。也许你可以详细说明一下? - Isaac Vero
1
@IsaacVero 我想象中的样子可能是这样的:https://gist.github.com/zapl/62d6af5e7dde5cc209c058313ca7ed16 - zapl
2
@IsaacVero 不这么认为。Downvote 最有可能是因为回答较短,外观不太好看(最初)。如果只有一个客户端处于活动状态,由于您的排序要求,不能有多个线程处于活动状态。无论哪种解决方案都会发生。另一种解决方案在处理仅有少数客户端之间的不均负载时更加灵活,但我怀疑这实际上不会是一个真实的问题。2k 客户端之间的分布应该在 32 个线程中相当平均。这也是 netty 所做的,即线程和客户端之间的固定绑定,它们是 Java 的 #1 网络框架。 - zapl
显示剩余2条评论

0

你想要按顺序处理每个客户端的消息,同时又不想为每个客户端分配单独的线程。这正是使用Actor模型的确切用例。Actor就像轻量级线程。它们不像通常的线程那样强大,但非常适合像你这样的可重复任务。

如果你发现通过Google找到的java actor libraries太过笨重,你可以使用我在Github存储库中提供的最紧凑的Actor实现,或者查看我异步库df4j中包含的扩展Actor实现。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接