过去,我编写了一些使用两个线程的Java程序。第一个线程(生产者)从API(C库)读取数据,创建一个Java对象,发送该对象到另一个线程。C API提供了一个事件流(无限)。这两个线程使用LinkedBlockingQueue作为管道交换对象(put,poll)。第二个线程(消费者)处理对象。(我还发现,在线程内部编写的代码更易读。第一个线程处理C API内容并生成合适的Java对象,第二个线程不需要处理C API并处理数据)。
现在,我对如何使用Java 8中的新流API实现上述情况感兴趣。但假设我想保留两个线程(生产者/消费者)。第一个线程将写入流。第二个线程从流中读取。我也希望我可以使用这种技术来处理更好的显式并行性(生产者/消费者),并且在流中可以使用一些隐式并行性(例如stream.parallel())。
我没有太多使用新流API的经验。因此,我尝试使用以下代码来解决上述想法。
- 我使用“generate”访问C API并将其馈送到Java流。 - 在消费者线程中,我使用了.parallel()来测试和处理隐式并行性。看起来还不错。但请参见下文。
问题:
1.在此方案中,生成是否是最佳方式? 2.我不明白如何在生产者中终止/关闭流,如果API存在一些错误并且我希望关闭整个管道。我使用stream.close还是抛出异常?
- 2.1 我使用了stream.close()。但是,在关闭后,“generate”仍在运行,我只能抛出异常来终止生成部分。此异常进入流中,消费者正在接收异常(这对我来说很好,消费者可以识别它并终止)。但在这种情况下,在异常到达时,生产者已经生产了比消费者处理的更多的项。 - 2.2 如果消费者使用隐式并行性stream.parallel()。那么生产者将处理更多的项目。因此,我无法解决此问题。(访问C API,检查错误,做决定) - 2.3 在生产者中抛出异常会到达消费者流,但并非所有插入的对象都被处理。
再次强调:想法是具有显式的线程并行性。但在内部,我可以处理新功能并在可能的情况下使用并行处理。
谢谢您考虑这个问题。
现在,我对如何使用Java 8中的新流API实现上述情况感兴趣。但假设我想保留两个线程(生产者/消费者)。第一个线程将写入流。第二个线程从流中读取。我也希望我可以使用这种技术来处理更好的显式并行性(生产者/消费者),并且在流中可以使用一些隐式并行性(例如stream.parallel())。
我没有太多使用新流API的经验。因此,我尝试使用以下代码来解决上述想法。
- 我使用“generate”访问C API并将其馈送到Java流。 - 在消费者线程中,我使用了.parallel()来测试和处理隐式并行性。看起来还不错。但请参见下文。
问题:
1.在此方案中,生成是否是最佳方式? 2.我不明白如何在生产者中终止/关闭流,如果API存在一些错误并且我希望关闭整个管道。我使用stream.close还是抛出异常?
- 2.1 我使用了stream.close()。但是,在关闭后,“generate”仍在运行,我只能抛出异常来终止生成部分。此异常进入流中,消费者正在接收异常(这对我来说很好,消费者可以识别它并终止)。但在这种情况下,在异常到达时,生产者已经生产了比消费者处理的更多的项。 - 2.2 如果消费者使用隐式并行性stream.parallel()。那么生产者将处理更多的项目。因此,我无法解决此问题。(访问C API,检查错误,做决定) - 2.3 在生产者中抛出异常会到达消费者流,但并非所有插入的对象都被处理。
再次强调:想法是具有显式的线程并行性。但在内部,我可以处理新功能并在可能的情况下使用并行处理。
谢谢您考虑这个问题。
package sandbox.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;
public class MyStream {
private volatile LongStream stream = null;
private AtomicInteger producerCount = new AtomicInteger(0);
private AtomicInteger consumerCount = new AtomicInteger(0);
private AtomicInteger apiError = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
MyStream appl = new MyStream();
appl.create();
}
private static void sleep(long sleep) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void apiError(final String pos, final int iteration) {
RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
System.out.println(apiException.getMessage());
throw apiException;
}
final private int simulateErrorAfter = 10;
private Thread produce() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Producer started");
stream = LongStream.generate(() -> {
int localCount;
// Detect error, while using stream.parallel() processing
int error = apiError.get();
if ( error > 0 )
apiError("1", error);
// ----- Accessing the C API here -----
localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
// ----- Accessing the C API here -----
// Checking error code from C API
if ( localCount > simulateErrorAfter ) { // Simulate an API error
producerCount.decrementAndGet();
stream.close();
apiError("2", apiError.incrementAndGet());
}
System.out.println("P: " + localCount);
sleep(200L);
return localCount;
});
System.out.println("Producer terminated");
}
});
thread.start();
return thread;
}
private Thread consume() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
stream.onClose(new Runnable() {
@Override
public void run() {
System.out.println("Close detected");
}
}).parallel().forEach(l -> {
sleep(1000);
System.out.println("C: " + l);
consumerCount.incrementAndGet();
});
} catch (Exception e) {
// Capturing the stream end
System.out.println(e);
}
System.out.println("Consumer terminated");
}
});
thread.start();
return thread;
}
private void create() throws InterruptedException {
Thread producer = produce();
while ( stream == null )
sleep(10);
Thread consumer = consume();
producer.join();
consumer.join();
System.out.println("Produced: " + producerCount);
System.out.println("Consumed: " + consumerCount);
}
}