如何将非并行流与并行流(一个生产者多个消费者)连接起来?

3
我正在尝试使用Java8中的流创建一个生产者多个消费者模型。我正在从数据库资源中读取和处理数据,并希望以流式方式处理它们(不能将整个资源读入内存)。
源的读取必须是单线程的(游标不是线程安全的),而读取速度很快,因此每个数据块的处理操作可以并行运行。
我还没有找到如何连接非并行流和并行流处理的方法。是否有任何方法可以使用Java8流API实现?
代码示例:
这个迭代器必须在单线程中运行,因为游标不是线程安全的。
class SimpleIterator<Data> implements Iterator<Data>{

    private volatile Cursor cursor;

    public SimpleIterator(Cursor cursor){
        this.cursor = cursor;
    }

   @Override
    public boolean hasNext() {
        return cursor.hasNext();
    }    

    @Override
    public Data next() {
     return cursor.next();

    }
}

//创建非并行流

SimpleIterator<Data> iterator = new SimpleIterator<>(queryCursor);
Iterable<Data> iterable = () -> iterator;
Stream<Data> resultStream = StreamSupport.stream(iterable.spliterator(), false); // prallel set as false

//每个数据的处理都应该并行运行

resultStream.parallel().forEach(data->processData(data)); 
public processData(Data data){
//heavy operation
}

但是如果在调用forEach之前将流设置为并行,则整个流都是并行的,并且迭代器会在多个线程中调用。 在Java8中,是否有任何方法可以将这两个流连接起来,或者我必须创建某个队列,以从单线程生产者流提供数据到并行流中。


2
我认为你在这里的假设是迭代器将被多个线程并行调用。这并不完全正确。它将被多个线程调用,但不是并行的。你的代码所做的是创建一个未知大小的IteratorSpliterator,它包装了你的Iterator。Spliterator和Iterator都不需要是线程安全的(Fork/Join会处理这个问题)。可以这样想:ArrayList是线程安全的吗?不是。你能对它进行并行流处理吗?可以。 - Stefan Zobel
你说得对,我的假设正如你所描述的那样。谢谢你给我解释清楚。尽管它们并没有同时运行,但我仍然认为如果多个线程调用我的迭代器可能会出现一些线程问题。例如,如果游标不被设置为volatile,其他线程可能会看到一些缓存对象而不是原始对象(例如)。 - HPCS
2
然后确保您的光标已经安全发布。IteratorSpliterator将按递增批次(每个批次为1024的倍数,例如:3072、7168、11264等)运行Iterator,并将元素从Iterator复制到具有当前批次大小的数组中。从该数组创建一个ArraySpliterator(可以进一步分割)。只有在复制完一个批次后,您才会在Iterator中看到线程切换。 - Stefan Zobel
谢谢您的澄清。您帮了我很多。 - HPCS
1个回答

0

我正在解决一个问题,需要在两个流上执行完全外连接。这些问题似乎很相似。我插入了两个阻塞队列来缓冲我的输入。我认为你可以使用一个阻塞队列来将单个流拆分成多个流,而不需要并行化源流。

我提出的解决方案如下所示。我尚未测试过连接两个流的解决方案,因此我不确定是否有效。AbstractSpliterator类具有trySplit的实现;trySplit的注释很有启发性。该类的最终方法从spliterator实现构造可并行化的流。

import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class StreamSplitter<T> extends Spliterators.AbstractSpliterator<T> {
    final T EOS = null; // Just a stub -- can't put a null in BlockingQueue

    private final BlockingQueue<T> queue;
    private final Thread thread;

    // An implementation of Runnable that fills a queue from a stream
    private class Filler implements Runnable {
        private final Stream<T> stream;
        private final BlockingQueue<T> queue;

        private Filler(Stream<T> stream, BlockingQueue<T> queue) {
            this.stream = stream;
            this.queue = queue;
        }

        @Override
        public void run() {
            stream.forEach(x -> {
                try {
                    // Blocks if the queue is full
                    queue.put(x);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            // Stream is drained put end of stream marker.
            try {
                queue.put(EOS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private StreamSplitter(long estSize, int characteristics, Stream<T> srcStream) {
        super(estSize, characteristics);
        queue = new ArrayBlockingQueue<T>(1024);
        // Fill the queue from a separate thread (may want to externalize this).
        thread = new Thread(new Filler(srcStream, queue));
        thread.start();
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        try {
            T value = queue.take(); // waits (blocks) for entries in queue

            // If end of stream marker is found, return false signifying
            // that the stream is finished.
            if (value == EOS) {
                return false;
            }
            // Accept the next value.
            action.accept(value);
        } catch (InterruptedException e) {
            return false;
        }
        return true;
    }

    public static <T> Stream<T> splitStream(long estSize, int characteristics, Stream<T> srcStream) {
        Spliterator<T> spliterator = new StreamSplitter<T>(estSize, characteristics, srcStream);
        return StreamSupport.stream(spliterator, true);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接