Java阻塞队列与批处理?

21

我对与Java BlockingQueue完全相同的数据结构感兴趣,但是它必须能够将队列中的对象分批处理。换句话说,我希望生产者能够将对象放入队列中,但是在消费者获取元素之前,必须等待一定数量(即批量大小)的对象进入到队列中。

一旦队列达到批量大小后,生产者必须等待消费者消费完队列中的所有元素,然后开始生产新的元素,消费者再次等待达到批量大小。

是否存在类似的数据结构?或者我应该自己写一个(我不介意),只是不想浪费时间去重复造轮子。


更新

可能需要澄清一下:

情况始终如下。可以有多个生产者向队列添加物品,但是从队列中取出物品的消费者永远只有一个。

现在的问题在于,这些设置会并行和串行地进行。换句话说,生产者为多个队列生产物品,而消费者本身也可以是生产者。这可以更容易地考虑为生产者、消费者-生产者以及最终消费者的有向图。

生产者应等待队列为空 (@Peter Lawrey),因为每个设置都将在一个线程中运行。如果让它们仅在空间可用时生产,您最终会面临太多的线程试图同时处理太多的事情的情况。

也许将其与执行服务结合使用可以解决问题?

5个回答

16
我建议您使用BlockingQueue.drainTo(Collection, int)。您可以与take()一起使用,以确保获取最少数量的元素。
使用此方法的优点是,批处理大小会随着工作量的增加而动态增长,并且当消费者忙时,生产者不必阻塞。也就是说,它自我优化延迟和吞吐量。
为了按照要求实现(我认为这是个不好的主意),您可以使用一个带有忙碌消费线程的SynchronousQueue。也就是说,消费线程执行以下操作:
 list.clear();
 while(list.size() < required) list.add(queue.take());
 // process list.

生产者在消费者忙碌时将会被阻塞。

我希望在消费者忙碌时,生产者能够阻塞。 - Nico Huysamen
1
有趣的是,大多数系统都会尽力避免这种情况。 ;) 第二个建议将完全做到这一点。如果您希望生产者阻塞,为什么要使用多个线程?如果您不希望它们同时运行,那么让“生产者”成为处理器/消费者会更简单。 - Peter Lawrey
请查看我的更新。该设计需要生产者阻塞以保持执行线程数量较低。此外,它解决了生产者和消费者之间的依赖问题。 - Nico Huysamen
在这种情况下,上面的建议就可以做到这一点。它适用于多个生产者。我发现最好让线程调度程序为您工作,而不是尝试发明自己的方法。 ;) - Peter Lawrey

2
这里是一个快速的(=简单但未完全测试)实现,我认为可能适合您的请求 - 如果需要,您应该能够扩展它以支持完整的队列接口。为了提高性能,您可以切换到ReentrantLock而不是使用"synchronized"关键字。
public class BatchBlockingQueue<T> {

    private ArrayList<T> queue;
    private Semaphore readerLock;
    private Semaphore writerLock;
    private int batchSize;

    public BatchBlockingQueue(int batchSize) {
        this.queue = new ArrayList<>(batchSize);
        this.readerLock = new Semaphore(0);
        this.writerLock = new Semaphore(batchSize);
        this.batchSize = batchSize;
    }

    public synchronized void put(T e) throws InterruptedException {
        writerLock.acquire();
        queue.add(e);
        if (queue.size() == batchSize) {
            readerLock.release(batchSize);
        }
    }

    public synchronized T poll() throws InterruptedException {
        readerLock.acquire();
        T ret = queue.remove(0);
        if (queue.isEmpty()) {
            writerLock.release(batchSize);
        }
        return ret;
    }

}

希望这对你有用。

2

我最近开发了一种实用工具,可以批量处理BlockingQueue元素,并在队列元素未达到批处理大小时使用刷新超时。它还支持使用多个实例来详细说明相同数据集的fanOut模式:

// Instantiate the registry
FQueueRegistry registry = new FQueueRegistry();

// Build FQueue consumer
registry.buildFQueue(String.class)
                .batch()
                .withChunkSize(5)
                .withFlushTimeout(1)
                .withFlushTimeUnit(TimeUnit.SECONDS)
                .done()
                .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));

// Push data into queue
for(int i = 0; i < 10; i++){
        registry.sendBroadcast("Sample"+i);
}

更多信息请点击这里!

https://github.com/fulmicotone/io.fulmicotone.fqueue


1

我不知道有没有现成的解决方案。如果我理解正确,您要求生产者在填充队列时工作(而消费者被阻塞),或者消费者在清空队列时工作(而生产者被阻塞)。如果是这种情况,我建议您不需要一个数据结构,而是需要一种机制来阻塞一方,而另一方以互斥方式工作。您可以在对象上加锁,并在内部进行逻辑处理,以确定是否为满或为空,然后释放锁并将其传递给另一方。因此,简而言之,你应该自己编写代码 :)


1

这听起来像是LMAX Disruptor模式中RingBuffer的工作原理。请参见http://code.google.com/p/disruptor/了解更多信息。

一个非常简单的解释是,您的主要数据结构是RingBuffer。生产者按顺序将数据放入环形缓冲区中,消费者可以取出生产者放入缓冲区的所有数据(因此基本上是批处理)。如果缓冲区已满,则生产者会阻塞,直到消费者完成并在缓冲区中释放出插槽。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接