Java中的非阻塞缓冲区

5
在一个高并发多线程的Java项目中,我需要实现一个非阻塞缓冲区。
在我的场景中,我有一个Web层,每秒接收大约20,000个请求。我需要将其中一些请求累积到某些数据结构(也就是所需的缓冲区)中,当它满了时(假设它包含1000个对象),这些对象应该被序列化到一个文件中,然后发送到另一个服务器进行进一步处理。
实现应该是非阻塞的。 我检查了ConcurrentLinkedQueue,但我不确定它是否适合这项工作。
我认为我需要以某种方式使用2个队列,一旦第一个队列被填满,它就会被新队列替换,并且完整的队列(“第一个”)将被传递以进行进一步处理。这是我目前正在考虑的基本想法,但我仍然不知道它是否可行,因为我不确定我能否在Java中切换指针(以切换完整的队列)。
有什么建议吗?
谢谢

你看过Java NIO设施吗?你能展示一下当前请求是如何进来并被累积的吗? - The Coordinator
对于数据结构,您可以尝试类似于这个的东西。 - Vrushank
5个回答

4
我通常会在应用程序启动时创建一个缓冲池,并将引用存储在BlockingQueue中。生成线程弹出缓冲区,填充它们,然后将引用推送到另一个队列上,在此队列上等待消费者。当消费者完成(在您的情况下写入数据)时,引用被推回池队列以供重复使用。这提供了大量的缓冲存储,无需在锁内进行昂贵的批量复制,消除了GC操作,提供了流量控制(如果池为空,则生产者被迫等待直到返回一些缓冲区),并防止内存耗尽。
我在其他语言中也使用了这样的设计(C++,Delphi)多年,效果很好。我有一个“ObjectPool”类,其中包含BlockingQueue和“PooledObject”类,以从中派生缓冲区。PooledObject具有对其池的内部私有引用(它在池创建时初始化),因此允许无参数的release()方法。这意味着,在具有多个池的复杂设计中,缓冲区始终释放到正确的池中,减少了错误潜力。
我的大多数应用程序都有GUI,因此我通常会将池级别转储到状态栏上的定时器上,例如每秒钟。然后我可以大致看出是否有多少负载,是否有缓冲区泄漏(数字持续下降,然后应用程序最终在空池上死锁),或者我是双重释放(数字持续上升,应用程序最终崩溃)。
还可以相对容易地在运行时更改缓冲区的数量,方法是创建更多缓冲区并将其推送到池中,或者等待池中的缓冲区,删除缓冲区并让GC销毁它们。

太棒了!终于找到了一个比连接池更不平凡的对象池用例。 - Andrey Chaschev
不错,这就是我想要的那种东西,但我没有表达清楚。 - Kayaman
我很快会检查这个解决方案。谢谢。 - forhas

3
我认为你提出的解决方案很有道理。你需要两个队列,processingQueue代表你想要的缓冲区大小(例如,你的示例中是1000),而waitingQueue则要大得多。每当processingQueue满时,它会将其内容放入指定的文件中,然后从waitingQueue中获取前1000个(如果等待队列少于1000个,则获取少于1000个)。
我唯一担心的是,你提到了每秒20000个和缓冲区大小为1000。我知道1000只是一个示例,但如果你不把它变得更大,那么问题可能只是转移到了waitingQueue,而不是解决它,因为waitingQueue会更快地接收到新的请求,而processingQueue不能处理它们,从而导致waitingQueue的缓冲区溢出。

1
我可能理解有误,但是您可以使用 ArrayList 来实现此操作,因为您不需要针对队列中的每个元素进行轮询。当数组大小达到限制并且您需要发送它时,在同步部分中刷新(创建副本并清除)数组即可。添加到此列表中的内容也应与此刷新操作同步。
交换数组可能不安全 - 如果您的发送速度比生成速度慢,则缓冲区可能很快开始互相覆盖。每秒分配 20000 个元素数组对于 GC 来说几乎没有任何负担。
Object lock  = new Object();

List list = ...;

synchronized(lock){
    list.add();
}

...

// this check outside is a quick dirty check for performance, 
// it's not valid out of the sync block
// this first check is less than nano-second and will filter out 99.9%
// `synchronized(lock)` sections
if(list.size() > 1000){
  synchronized(lock){  // this should be less than a microsecond
     if(list.size() > 1000){  // this one is valid
       // make sure this is async (i.e. saved in a separate thread) or <1ms
       // new array allocation must be the slowest part here
       sendAsyncInASeparateThread(new ArrayList(list)); 
       list.clear();
     }
  }
}

更新

考虑到发送是异步的,这里最慢的部分是new ArrayList(list),对于1000个元素应该在1微秒左右,每秒20微秒。我没有测量过,但是从分配1百万个元素需要约1毫秒的比例中解决了这个问题。

如果您仍然需要一个超快速同步队列,您可能想要查看MentaQueue


这很好,但这是一个同步序列,我担心会显著降低我的性能(这是一个实时系统)。 - forhas
好的,最受欢迎的答案可能更好,因为它涉及到更一般的模式,但是对于您的要求,这不会有性能问题。考虑到发送是异步的,在这里最慢的部分是 new ArrayList(list),对于1000个元素应该在1微秒左右,每秒20微秒。我没有测量过,但是从分配了大约1毫秒的1百万个元素的比例中解决了这个问题。 - Andrey Chaschev

1

不要将每个请求对象放入队列中,而是分配一个大小为1000的数组,在填满时将该数组放入队列中发送线程,该线程将对整个数组进行序列化和发送。然后再分配另一个数组。

当发送方无法快速工作并且其队列已溢出时,您将如何处理情况?为避免内存错误,请使用有限大小的队列。


谢谢。我想到了类似的东西,问题是一旦我的数组满了,我需要发送它并分配一个新的数组,而且整个过程必须同步,不是吗?否则,我可能会填充未初始化的数组(例如)。但是同步意味着我正在阻塞。你怎么看? - forhas
1
同步,因此阻塞,在并发系统中是不可避免的。不同的部分以不同的速度工作,因此最快的部分必须减速。你所能做的就是尽量减少开销。批处理(例如将1000个请求收集在一个批次中)是一种实现最小化的方法。 - Alexei Kaigorodov

0
你所说的“switch pointers”是什么意思?在Java中没有指针(除非你在谈论引用)。
无论如何,正如你可能从Javadoc中看到的那样,ConcurrentLinkedQueue的size()方法存在一个“问题”。不过,你可以使用最初的想法,即使用2个(或更多)缓冲区进行切换。可能会出现一些磁盘I/O的瓶颈。也许size()的非常数时间在这里也不是问题。
当然,如果你希望它是非阻塞的,你需要有大量的内存和快速的磁盘(以及更大/更大的缓冲区)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接