使用Java 8流混合显式和隐式并行性

3
过去,我编写了一些使用两个线程的Java程序。第一个线程(生产者)从API(C库)读取数据,创建一个Java对象,发送该对象到另一个线程。C API提供了一个事件流(无限)。这两个线程使用LinkedBlockingQueue作为管道交换对象(put,poll)。第二个线程(消费者)处理对象。(我还发现,在线程内部编写的代码更易读。第一个线程处理C API内容并生成合适的Java对象,第二个线程不需要处理C API并处理数据)。
现在,我对如何使用Java 8中的新流API实现上述情况感兴趣。但假设我想保留两个线程(生产者/消费者)。第一个线程将写入流。第二个线程从流中读取。我也希望我可以使用这种技术来处理更好的显式并行性(生产者/消费者),并且在流中可以使用一些隐式并行性(例如stream.parallel())。
我没有太多使用新流API的经验。因此,我尝试使用以下代码来解决上述想法。
- 我使用“generate”访问C API并将其馈送到Java流。 - 在消费者线程中,我使用了.parallel()来测试和处理隐式并行性。看起来还不错。但请参见下文。
问题:
1.在此方案中,生成是否是最佳方式? 2.我不明白如何在生产者中终止/关闭流,如果API存在一些错误并且我希望关闭整个管道。我使用stream.close还是抛出异常?
- 2.1 我使用了stream.close()。但是,在关闭后,“generate”仍在运行,我只能抛出异常来终止生成部分。此异常进入流中,消费者正在接收异常(这对我来说很好,消费者可以识别它并终止)。但在这种情况下,在异常到达时,生产者已经生产了比消费者处理的更多的项。 - 2.2 如果消费者使用隐式并行性stream.parallel()。那么生产者将处理更多的项目。因此,我无法解决此问题。(访问C API,检查错误,做决定) - 2.3 在生产者中抛出异常会到达消费者流,但并非所有插入的对象都被处理。
再次强调:想法是具有显式的线程并行性。但在内部,我可以处理新功能并在可能的情况下使用并行处理。
谢谢您考虑这个问题。
package sandbox.test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

public class MyStream {
    private volatile LongStream stream = null;
    private AtomicInteger producerCount = new AtomicInteger(0);
    private AtomicInteger consumerCount = new AtomicInteger(0);
    private AtomicInteger apiError = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    MyStream appl = new MyStream();
    appl.create();
    }

    private static void sleep(long sleep) {
    try {
        Thread.sleep(sleep);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    }

    private static void apiError(final String pos, final int iteration) {
    RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
    System.out.println(apiException.getMessage());
    throw apiException;
    }

    final private int simulateErrorAfter = 10;

    private Thread produce() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        System.out.println("Producer started");
        stream = LongStream.generate(() -> {
            int localCount;
            // Detect error, while using stream.parallel() processing
            int error = apiError.get();
            if ( error > 0 )
                apiError("1", error);
            // ----- Accessing the C API here -----
            localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
            // ----- Accessing the C API here -----

            // Checking error code from C API
            if ( localCount > simulateErrorAfter ) { // Simulate an API error
                producerCount.decrementAndGet();
                stream.close();
                apiError("2", apiError.incrementAndGet());
            }
            System.out.println("P: " + localCount);
            sleep(200L);
            return localCount;
            });
        System.out.println("Producer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private Thread consume() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        try {
            stream.onClose(new Runnable() {
            @Override
            public void run() {
                System.out.println("Close detected");
            }
            }).parallel().forEach(l -> {
            sleep(1000);
            System.out.println("C: " + l);
            consumerCount.incrementAndGet();
            });
        } catch (Exception e) {
            // Capturing the stream end
            System.out.println(e);
        }
        System.out.println("Consumer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private void create() throws InterruptedException {
    Thread producer = produce();
    while ( stream == null )
        sleep(10);
    Thread consumer = consume();
    producer.join();
    consumer.join();
    System.out.println("Produced: " + producerCount);
    System.out.println("Consumed: " + consumerCount);

    }
}

2
我认为你正在寻找一种响应式框架,比如RxJava。虽然你可以用足够大的杠杆来强制Streams做你想要的事情,但最好使用专门为此工作设计的工具。 - Brian Goetz
1个回答

3

您需要了解有关Stream API的一些基本知识:

  • 应用于流上的所有操作都是惰性的,在应用终端操作之前不会执行任何操作。使用“生产者”线程创建流没有意义,因为此线程不会执行任何操作。所有操作都在您的“消费者”线程中执行,以及由Stream实现本身启动的后台线程。创建Stream实例的线程是完全无关紧要的。

  • 关闭流对于流操作本身没有影响,即不会关闭线程。它旨在释放额外资源,例如关闭与使用Files.lines(…)返回的流关联的文件。您可以使用onClose安排这样的清理操作,Stream将在调用close时调用它们,但仅此而已。对于Stream类本身而言,它没有任何意义。

  • Stream并未模拟“一个线程正在写入,另一个线程正在读取”的场景。它们的模型是“一个线程调用您的Supplier,然后调用您的Consumer,另一个线程也是如此,以及其他x个线程...”

    如果您想使用具有不同生产者和消费者线程的生产者/消费者方案,则最好使用Thread或使用线程安全队列的ExecutorService

但是,您仍然可以使用Java 8功能。例如,无需使用内部类实现Runnable;您可以为它们使用lambda表达式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接