生产者/消费者多线程

20

背景

由于缺乏学费,我在收费站上夜班工作,并利用互联网自学编程技能,希望明天能有更好的工作或者我制作的应用程序能在线销售。长夜漫漫,客人寥寥。

我正在尝试解决多线程问题,因为我在文献中(例如Android SDK)遇到了很多使用它的代码,但我仍然觉得它很模糊。

精神

我现在的方法是:尝试编写我能想到的最基本的多线程示例,稍微思考一下,看看能否使我的大脑适应一些新颖的思考方式。我暴露自己的极限,希望能够超越它们。请随意进行批评,甚至挑剔到点滴细节,指出更好的做法。

目标

  • 根据我迄今为止的努力(提供代码),获得如何完成上述任务的建议

练习

这是我定义的范围:

定义

创建两个类,它们协同工作以生产数据对象并消耗这些对象。一个线程创建对象并将它们传递到共享空间中供另一个线程获取和消耗。我们称生产线程为Producer,消费线程为Consumer,共享空间为SharedSpace。为了通过类比来消耗其他对象的生产行为,可以将其 assimilated 到这种情况下:
`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)

为了简化该练习,我决定让母亲在孩子吃蛋糕时做饭。她只需等待孩子完成蛋糕,立即再做一个,直到一定的限制,以进行良好的育儿。这个练习的要点是练习线程的信号传递,而不是实现并发。相反,我专注于完美的串行化,没有轮询或“我可以走了吗?”的检查。我想我将不得不编写后续的练习,在其中母亲和孩子将“并行工作”。

方法

  • 让我的类实现Runnable接口,以便它们有自己的代码入口点

  • 使用我的类作为构造函数参数传递给Thread对象,这些对象从程序的main入口点实例化并启动

  • 通过Thread.join()确保main程序在Thread之前不会终止

  • 设置Producer创建数据供Consumer使用的次数限制

  • 确定Produce将用于信号结束数据生产的sentinel

  • 记录共享资源上锁和数据生产/消费事件的获取,包括工作线程的最终签署

  • 从程序的main创建单个SharedSpace对象,并在启动之前将其传递给每个工作线程

  • 在每个工作线程内部存储对SharedSpace对象的private引用

  • 提供防范措施和消息,描述Consumer在任何数据被生产之前准备好消费的条件

  • 在给定迭代次数后停止Producer

  • 在读取到sentinel值后停止Consumer

代码


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}

执行日志


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.

问题

  • 以上内容是否正确?(例如,使用了正确的语言工具、正确的方法,是否包含任何愚蠢的代码等)

但它“看起来正确”吗?

即使输出“看起来不错”,我也会询问正确性,因为您无法想象在我的测试中有多少次事情出了差错,其中一次没有另一次(例如,当消费者首先启动时,当生产者在生成哨兵后从未退出等)。我已经学会了不从“成功运行”中宣称正确性。相反,我对伪并行代码非常怀疑!(按定义,此代码甚至不是并行的!)

详细答案

一个好的问题专注于仅一个请求的建议(上面的建议),但如果您愿意,请随意提到以下其他主题的见解:

  • 我如何在编写下一次尝试的同时测试并行代码?

  • 哪些工具可以帮助我进行开发和调试? 考虑我使用Eclipse

  • 如果我允许Producer继续生产,每次生产需要一些可变时间,而Consumer消费任何可用的东西,这种方法是否会改变?锁定是否必须移动到其他位置?信号传递是否需要从此等待/通知范例更改?

  • 这种做事情的方法是否已经过时了,我应该学习其他东西吗?从这个收费站来看,我不知道在Java的“真实世界”中会发生什么

下一步

  • 我接下来该去哪里?我已经看到了某处提到了"futures"的概念,但我可以使用一个按顺序排序的数字列表来逐个解决问题,并链接到相关的学习资源

Tino Sino


6
天啊,对于一个没有上过专业计算机科学学校的人来说,你的思维方式比我认识的许多计算机科学毕业生都更有条理。恭喜你。 :) - Tudor
2
对于你的问题:从你期望的角度来看,它是“正确”的,对吗?使用wait和notify是正确的,但不被广泛推荐。我还建议您使用“并发”API中的某些东西。可以是一些专门的数据结构,如BlockingQueue或同步工具...我认为,你会喜欢它的。我建议你做的另一件事是不要等待太久。尝试“快速生产者,慢消费者”以及反之亦然,并探索这种情况带来的结果。 - Fildor
哦,他们有时会这样告诉我。因此有了“机器人”Tino Sino的绰号。我想这是带有贬义的。:] 另外,我开始学会如何通过提问而不惹恼任何人来利用集体互联网智慧 :] - Robottinosino
1
@Tudor:幸好你在大学而不是在高速公路收费站! - Robottinosino
这个问题表现出了你的热情,加一分。 - guido
显示剩余3条评论
4个回答

6
上面的内容正确吗? 我唯一看到的问题是由@Tudor和@Bhaskar提到的。每当您在等待条件时测试条件时,您必须使用while循环。但是,这更多涉及到多个生产者和消费者之间的竞争条件。虚假唤醒可能会发生,但竞争条件更有可能发生。请参见我的页面。是的,您只有一个生产者和一个消费者,但您可能会尝试将代码扩展到多个消费者或将您的代码复制到另一种情况中。 我已经学会不从“成功运行”中声称正确性。相反,我对伪并行代码非常怀疑! 好直觉。 如何在编写下一个尝试的同时测试并行代码?
这很难。扩展是一种方法。添加多个生产者和消费者,看是否存在问题。在具有不同数量/类型处理器的多个架构上运行。您最好的防御将是代码正确性。紧密同步,良好使用BlockingQueueExecutorService等类使您的代码更简单/更清洁。
没有简单的答案。测试多线程代码非常困难。
“哪些工具可以在开发和调试中帮助我?”
就一般情况而言,我会寻找覆盖率工具,例如Emma,以便您可以确保您的单元测试覆盖了所有代码。
在多线程代码测试方面,请了解如何读取kill -QUIT线程转储并查看Jconsole中的运行线程。Java分析器,例如YourKit也可能有所帮助。
“如果我允许生产者继续生产,并且每次生产需要一些可变的时间,那么方法会改变吗?”
我不这么认为。消费者将永远等待生产者。也许我没有理解这个问题?

这种做事的方法已经过时了,我应该学习其他东西吗?从这个收费站来看,我对Java的“真实世界”发生的事情一无所知。

接下来要学习ExecutorService,它们处理大量使用new Thread()方式编写的代码,特别是当你需要使用线程执行多个异步任务时。这里有一个教程

我接下来该去哪里?

再次提到,ExecutorService。我假设您已经阅读了这份入门文档。正如@Bhaskar所提到的那样,Java并发编程实践是一本好的圣经。


以下是关于您的代码的一些普遍评论:

  • The SharedSpace and Threaded classes seems like a contrived way to do this. If you are playing around with base classes and the like then fine. But in general, I never use a pattern like this. A producer and consumer are usually working with a BlockingQueue like LinkedBlockingQueue in which case the synchronization and volatile payloads are taken care of for you. Also, I tend to inject shared information into an object constructor as opposed to getting it from a base class.

  • Typically if I am using synchronized it is on a private final field. Often I create a private final Object lockObject = new Object(); for locking unless I am working with an object already.

  • Be careful of huge synchronized blocks and putting log messages inside of synchronized sections. Logs usually do synchronized IO to the file-system which can be very expensive. You should have small, very tight, synchronized blocks if possible.

  • You define consumedData outside of the loop. I would define it at the point of the assignment and then use a break to bail from the loop if it is == -1. Make sure to limit your local variables scope if at all possible.

  • Your logging messages are going to dominate your code performance. This means that when you remove them, your code is going to perform completely differently. This is very important to realize when you go to debug problems with it. The performance will also (most likely) change when you move to a different architecture with different CPUs/cores.

  • You probably know this but when you call sharedSpace.notify();, that only means that another thread is notified if it is currently in sharedSpace.wait();. If it is doesn't something else then it will miss the notification. Just FYI.

  • It's a little strange to do a if (nIterations <= N_ITERATIONS), and then 3 lines below the else do it again. Duplicating the notify() would be better to simplify the branching.

  • You have int nIterations = 0; then a while then inside a ++. That's a recipe for a for loop:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    

这是你的代码更加紧凑的版本。这只是一个我写作风格的例子。除了缺少的 while 语句,你的版本似乎没有问题。
public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}

4
你做得相当不错,实际上没有太多可以挑剔的。我想推荐的一个事情是,你应该避免在缓冲对象本身上进行同步。在这种情况下,这样做没问题,但是假设您切换到数据结构缓冲区,根据类别,它可能会在内部同步(例如Vector,尽管现在已过时),因此从外部获取锁可能会弄乱它。
编辑:Bhaskar提出了使用while将调用wait包装起来的好方法。这是因为可以发生臭名昭著的虚假唤醒,迫使线程过早地退出等待,因此您需要确保它重新进入。
接下来可以做的是实现有限缓冲区生产者消费者:有一些共享数据结构,例如链表,并设置最大大小(例如10个项目)。然后让生产者继续生产,只有在队列中有10个项目时才暂停它。当缓冲区为空时,将暂停消费者。
您可以采取的下一步是学习如何自动执行手动实现的过程。查看提供具有阻塞行为的缓冲区的BlockingQueue(即,如果缓冲区为空,则消费者将自动阻塞,如果满则生产者将阻塞)。
此外,根据情况,执行器(查看ExecutorService)可能是一个值得替换的选择,因为它们封装了任务队列和一个或多个工作程序(消费者),因此您只需要生产者。

非常感谢你的回答,@Tudor。你能否插入一两个代码片段来将你的建议转化为Java代码?这样会不会太长?(我不知道你有什么想法) - Robottinosino
@Robottinosino:你是指BlockingQueueExecutorService这部分吗? - Tudor
啊!这是我最讨厌的事情。while循环不仅仅是关于虚假唤醒,它们还涉及到关键的多生产者/消费者竞争条件。请参见此处:http://256.com/gray/docs/misc/producer_consumer_race_conditions/ - Gray
@Gray:但这不是一个多生产者-消费者的错误吗?在这种情况下(单个生产者-消费者),只有虚假唤醒可能会使程序出错。 - Tudor
当然,出于这两个原因,这只是好的编程实践。99.99% 的人永远不会看到虚假唤醒。更大比例的人会将其从1个消费者扩展到2个消费者(或者复制它),并不理解为什么它会崩溃。我讨厌虚假唤醒的答案,因为这只是0.001% 的原因。 - Gray

0

生产者消费者可以是实现Runnable接口的简单类(不需要extends Threaded),这样它们就不那么脆弱了。客户端可以自己创建Thread并附加实例,因此不需要类层次结构的开销。

在执行wait()之前,您的条件应该是while()而不是if

编辑:来自JCIP第301页:

void stateDependentMethod() throws InterruptedException {
      // condition predicate must be guarded by lock
      synchronized(lock) {
          while (!conditionPredicate())
            lock.wait();
          // object is now in desired state
       }
  }

您已静态地内置了停止条件。通常,生产者和消费者应该更加灵活-他们应该能够响应外部信号以停止。

首先,为了实现外部停止信号,您需要一个标志:

class Producer implements Runnable { 
     private volatile boolean stopRequested ;

     public void run() {
        while(true){
           if(stopRequested )
                // get out of the loop
         }
     }

     public void stop(){
        stopRequested  = true;
        // arrange to  interrupt the Producer thread here.
     }
 }

当你尝试实现以上内容时,你可能会发现其他的复杂情况出现了 - 例如 - 你的生产者首先发布然后再wait(),但这可能会导致问题。

如果你对进一步阅读感兴趣,我建议阅读书籍 - Java并发编程实践。这本书将提供比我在此处添加的更多建议。


您能提供一个代码片段来说明您的观点吗?我“认为”我能够理解它们的意思,但是... - Robottinosino

0
太棒了,你的野心值得赞扬!你在近八年前就提出了这个问题。我希望你的努力为你提供了(并且将继续为你提供)所需的教育。
目前,在Java中实现多线程强烈不建议使用wait()、notify() 和 join()。当你尝试在这个低级别控制并发时,容易犯错(事实上,Java的设计师承认许多Thread的方法和语义实际上是设计错误,但由于向后兼容性而必须保留 - 许多方法将随着新的“虚拟线程”(Project Loom)的推出而消失,但那是另一个话题)。
如今手动启动和控制线程的首选方式是通过ExecutorService.submit(Callable),返回Future。然后,你可以通过调用Future.get()等待线程退出(并获取返回值),该返回值是由Callable返回的类型V(如果Callable抛出未捕获的异常,则会抛出ExecutionException)。
下面的类是如何实现这样一个功能的示例。这将通过单个有界阻塞队列连接任意数量的生产者和消费者。(线程返回值被忽略,因此调用ExecutorService.submit(Runnable),返回Future<?>而不是ExecutorService.submit(Callable<V>))。
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public abstract class ProducerConsumer<E> {

    private final BlockingQueue<Optional<E>> queue;

    public ProducerConsumer(
            int numProducerThreads, int numConsumerThreads, int queueCapacity) {
        if (numProducerThreads < 1 || numConsumerThreads < 1 || queueCapacity < 1) {
            throw new IllegalArgumentException();
        }
        queue = new ArrayBlockingQueue<Optional<E>>(queueCapacity);
        final ExecutorService executor = 
                Executors.newFixedThreadPool(numProducerThreads + numConsumerThreads);
        try {
            // Start producer threads
            final List<Future<?>> producerFutures = new ArrayList<>();
            final AtomicInteger numLiveProducers = new AtomicInteger();
            for (int i = 0; i < numProducerThreads; i++) {
                producerFutures.add(executor.submit(() -> {
                    numLiveProducers.incrementAndGet();
                    // Run producer
                    producer();
                    // When last producer finishes, deliver poison pills to consumers
                    if (numLiveProducers.decrementAndGet() == 0) {
                        for (int j = 0; j < numConsumerThreads; j++) {
                            queue.put(Optional.empty());
                        }
                    }
                    return null;
                }));
            }
            // Start consumer threads
            final List<Future<?>> consumerFutures = new ArrayList<>();
            for (int i = 0; i < numConsumerThreads; i++) {
                consumerFutures.add(executor.submit(() -> {
                    // Run Consumer
                    consumer();
                    return null;
                }));
            }
            // Wait for all producers to complete
            completionBarrier(producerFutures, false);
            // Shut down any consumers that are still running after producers complete
            completionBarrier(consumerFutures, false);
        } finally {
            executor.shutdownNow();
        }
    }

    private static void completionBarrier(List<Future<?>> futures, boolean cancel) {
        for (Future<?> future : futures) {
            try {
                if (cancel) {
                    future.cancel(true);
                }
                future.get();
            } catch (CancellationException | InterruptedException e) {
                // Ignore
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected void produce(E val) {
        try {
            queue.put(Optional.of(val));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected Optional<E> consume() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /** Producer loop. Call {@link #produce(E)} for each element. */
    public abstract void producer();

    /**
     * Consumer thread. Call {@link #consume()} to get each successive element,
     * until an empty {@link Optional} is returned.
     */
    public abstract void consumer();
}

使用方法如下:

new ProducerConsumer<Integer>(/* numProducerThreads = */ 1, /* numConsumerThreads = */ 4,
        /* queueCapacity = */ 10) {
    @Override
    public void producer() {
        for (int i = 0; i < 100; i++) {
            System.out.println("Producing " + i);
            produce(i);
        }
    }

    @Override
    public void consumer() {
        for (Optional<Integer> opt; (opt = consume()).isPresent; ) {
            int i = opt.get();
            System.out.println("Got " + i);
        }
    }
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接