使用Java在线程之间传输数据

7
我正在编写一个多线程应用程序,模拟电影院。每个参与者都是自己的线程,必须完全通过信号量进行并发控制。唯一的问题是如何基本地链接线程,以便它们可以通过管道进行通信。
例如:顾客[1]是一个线程,获得了一个允许他走向售票处的信号量。现在,顾客[1]必须告诉售票员,他想看电影“X”。然后,售票员[1]也是一个线程,必须检查电影是否已满,并出售一张票或告诉顾客[1]选择另一个电影。
如何在仍然保持信号量并发性的同时来回传递数据?
另外,我只能使用java.util.concurrent中的Semaphore类。

1
我给你的主要提示是不要被“管道”这个词卡住思路。更多地考虑一个“盒子”,里面有信息,你可以在盒子上贴一张便笺告诉另一个线程,当有有趣的东西出现时让它去查看。 - Neil Coffey
2个回答

8
一种在线程之间传递数据的简单方法是使用位于包java.util.concurrent中的接口BlockingQueue<E>的实现。

这个接口有多个添加元素到集合的方法,具有不同的行为:

  • add(E):如果可能,添加元素,否则抛出异常
  • boolean offer(E):如果元素已被添加,则返回true,否则返回false
  • boolean offer(E, long, TimeUnit):尝试添加元素,并等待指定的时间
  • put(E):阻塞调用线程,直到元素被添加

它还定义了类似行为的元素检索方法:

  • take():阻塞,直到有元素可用
  • poll(long, TimeUnit):检索元素或返回null

我最常用的实现是:ArrayBlockingQueueLinkedBlockingQueueSynchronousQueue

第一个ArrayBlockingQueue由传递给其构造函数的参数定义了固定大小。

第二个LinkedBlockingQueue具有无限大小。它始终接受任何元素,即offer将立即返回true,add永远不会抛出异常。

第三个、对我来说最有趣的是SynchronousQueue,它正是一个管道。可以将其视为大小为0的队列。它永远不会保留元素:只有在其他线程尝试从中检索元素时,此队列才会接受元素。相反,检索操作仅在有另一个线程尝试推送元素时才返回元素。

为了满足“作业”要求,即“仅使用信号量进行同步”,您可以参考我给您关于SynchronousQueue的描述,并编写类似的代码:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

请注意,这个类的行为与我之前描述的SynchronousQueue类相似。
一旦调用了put(E)方法,它就会获取写入信号量,并将其保持为空,以便同一方法的另一个调用在其第一行阻塞。该方法然后存储传递对象的引用,并释放读取信号量。此释放将使任何调用take()方法的线程可以继续执行。 take()方法的第一步自然是获取读取信号量,以禁止其他线程同时检索元素。在检索元素并将其保存在本地变量中(练习:如果删除E e = this.e这一行会发生什么?)后,方法释放写入信号量,以便任何线程都可以再次调用put(E)方法,并返回已保存在本地变量中的内容。
需要注意的重要一点是,传递对象的引用保存在私有字段中,而take()put(E)方法都是final的。这非常重要,但通常被忽视。如果这些方法不是final的(或更糟糕的是,字段不是私有的),则继承类将能够更改take()put(E)的行为,从而破坏契约。
最后,您可以通过以下方式使用try {} finally {}避免在take()方法中声明本地变量:
class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

这个例子的意义在于展示一个try/finally的用法,这在经验不足的开发者中往往被忽视。显然,在这种情况下,没有真正的收益。
哦,该死,我已经为你完成了大部分的家庭作业。作为报复,并为了让你测试一下关于信号量的知识,你可以实现BlockingQueue合约定义的其他方法之一吗?例如,您可以实现一个offer(E)方法和一个take(E,long,TimeUnit)方法!
祝你好运。

这不会满足作业要求,即“并发必须完全通过信号量实现”。(当然,在现实生活中,使用高级并发工具之一是最佳选择。) - Esko Luontola
确实!我一开始没注意到。 - Bruno Reis
是的,我只能使用java.util.concurrent中的信号量。不能使用其他线程安全类...那么这应该用管道来完成,如果是这样,怎么做呢? - JuggernautDad
@JustinY17:看一下我在答案中添加的Pipe<E>类!然后,为了练习,请尝试根据我给出的两个方法实现BlockingQueue<E>定义的其他方法! - Bruno Reis

1

以读写锁的共享内存为思路。

  1. 创建一个缓冲区来存放消息。
  2. 通过使用锁/信号量来控制对缓冲区的访问。
  3. 将此缓冲区用于线程间通信目的。

敬礼

PKV


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接