单个InputStream的并发处理与独立消费者

7
我需要生成N个消费者线程,同时处理相同的InputStream,例如-某种方式进行转换、计算校验和或数字签名等。这些消费者之间互不依赖,并且它们都使用第三方库,该库接受InputStream作为数据源。
因此,我可以创建一个InputStream的某些实现,其中会:
1. 从“父”流中读取数据块 2. 解除消费者的阻塞状态 3. 等待每个消费者读取整个数据块 4. 读取下一个块
虽然看起来很简单,但可能会引起各种问题,比如当某个消费者死亡时造成死锁,需要实现所有的InputStream方法,使用屏障/门闩控制消费者自身的分叉/加入等操作。
我的朋友告诉我这只需半小时就能完成,真是让我愉快的夜晚。
我希望使用足够成熟的解决方案(搜索没有结果,因此我的google-fu不够好?),或者不要麻烦,将整个“源”流复制到临时文件中,并将其用作数据源。后一种解决方案似乎更可靠,但可能导致创建大容量文件(例如在处理流媒体音频时)。

你能将数据写入文件并生成N个FileInputStream吗? - Jon Lin
@JonLin 正如他在问题末尾所说的那样,他可以。 - Marko Topolnik
3个回答

3
在我看来,你至少应该有某种缓冲,这样不同的消费者可以以不同的速度通过流,而不会被当前最慢的消费者不断拖累。这基本上确保了最坏情况下的性能和并发几乎没有任何好处。
你可以用每个已使用过块的使用者标记每个块,然后删除那些完全使用完毕的块。也许这可以通过每个使用者持有对其尚未使用的每个块的引用来实现,这将使GC自动处理已使用的块。生产者可能会保留对块的WeakReference列表,以便掌握尚未使用的块数,并根据此基础进行限制。
我还考虑每个线程都有一个单独的InputStream实例,它内部与生产者InputStream通信。这样,你就有了一个易于解决活锁危险的简单解决方案:try ... finally { is.close(); }--垂死的消费者关闭自己的输入流。这会传达给生产者。
我有一些使用每个消费者的ArrayBlockingQueue的想法。要确保所有消费者都得到适当的供应,而不使生产者阻塞或忙等,可能会有一些困难。

我不会说这没有什么好处 - 如果有5个消费者工作1秒钟,而一个消费者工作2秒钟,同时调用将给出2秒钟,而顺序调用将给出7秒钟。或者我在这里漏掉了什么?使用标记块和缓冲区,我会增加内存消耗,这是我想避免的。 - jdevelop
是的,你说的是不可避免的。然而,如果您的消费者平均平衡,但他们的性能差异很大,如果您总是等待每个落后的消费者,您将失去竞争的机会。缓冲会有所帮助。如果你引入线程优先级平衡,你实际上可以实现这样的情况。 - Marko Topolnik

0

你考虑过使用管道流吗?你的生产者可以有一个或多个PipedOuputStream,它会将从文件中读取的任何内容传输到管道中。在管道的另一端,你可以有不同的消费者线程读取相应的PipedInputstream(这是一个InputStream,你可以与你的库共享)。

你的生产者线程可以通过这些管道决定数据应该发送到哪个管道,从而为在管道另一端读取的给定消费者线程提供要处理的数据。

如果你需要从你的消费者线程获取数据,那么你可以创建另一个管道,以相反的方向将数据发送回你。


1
一个 PipedOutputStream 会在任何消费者落后时阻塞生产者,使所有其他消费者饥饿。 - Marko Topolnik

0

您可以尝试使用一些Java消息服务(JMS)实现,例如Apache ActiveMQ

在您的情况下,您需要创建所谓的主题(请参见主题 vs. 队列)。生产者创建主题,并向N个消费者发布主题,这些消费者可以并发运行,每个消费者都会接收完全相同的数据。

由于您想要使用InputStream,因此有一章关于如何将消息作为流发送

我想,通常情况下,生产者和消费者将是单独的进程,可能在网络上的不同计算机上运行。不过,我认为您可以将其配置为完全在单个JVM中运行。这将取决于JMS的实现。这些也非常出名:HornetQ by JBossRabbitMQ,还有很多其他的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接