一种在线程之间传递数据的简单方法是使用位于包
java.util.concurrent
中的接口
BlockingQueue<E>
的实现。
这个接口有多个添加元素到集合的方法,具有不同的行为:
add(E)
:如果可能,添加元素,否则抛出异常
boolean offer(E)
:如果元素已被添加,则返回true,否则返回false
boolean offer(E, long, TimeUnit)
:尝试添加元素,并等待指定的时间
put(E)
:阻塞调用线程,直到元素被添加
它还定义了类似行为的元素检索方法:
take()
:阻塞,直到有元素可用
poll(long, TimeUnit)
:检索元素或返回null
我最常用的实现是:ArrayBlockingQueue
,LinkedBlockingQueue
和SynchronousQueue
。
第一个ArrayBlockingQueue
由传递给其构造函数的参数定义了固定大小。
第二个LinkedBlockingQueue
具有无限大小。它始终接受任何元素,即offer
将立即返回true,add
永远不会抛出异常。
第三个、对我来说最有趣的是SynchronousQueue
,它正是一个管道。可以将其视为大小为0的队列。它永远不会保留元素:只有在其他线程尝试从中检索元素时,此队列才会接受元素。相反,检索操作仅在有另一个线程尝试推送元素时才返回元素。
为了满足“作业”要求,即“仅使用信号量进行同步”,您可以参考我给您关于SynchronousQueue的描述,并编写类似的代码:
class Pipe<E> {
private E e;
private final Semaphore read = new Semaphore(0);
private final Semaphore write = new Semaphore(1);
public final void put(final E e) {
write.acquire();
this.e = e;
read.release();
}
public final E take() {
read.acquire();
E e = this.e;
write.release();
return e;
}
}
请注意,这个类的行为与我之前描述的SynchronousQueue类相似。
一旦调用了
put(E)
方法,它就会获取写入信号量,并将其保持为空,以便同一方法的另一个调用在其第一行阻塞。该方法然后存储传递对象的引用,并释放读取信号量。此释放将使任何调用
take()
方法的线程可以继续执行。
take()
方法的第一步自然是获取读取信号量,以禁止其他线程同时检索元素。在检索元素并将其保存在本地变量中(
练习:如果删除E e = this.e这一行会发生什么?)后,方法释放写入信号量,以便任何线程都可以再次调用
put(E)
方法,并返回已保存在本地变量中的内容。
需要注意的重要一点是,传递对象的引用保存在
私有字段中,而
take()
和
put(E)
方法都是
final的。这非常重要,但通常被忽视。如果这些方法不是final的(或更糟糕的是,字段不是私有的),则继承类将能够更改
take()
和
put(E)
的行为,从而破坏契约。
最后,您可以通过以下方式使用
try {} finally {}
避免在
take()
方法中声明本地变量:
class Pipe<E> {
public final E take() {
try {
read.acquire();
return e;
} finally {
write.release();
}
}
}
这个例子的意义在于展示一个try/finally的用法,这在经验不足的开发者中往往被忽视。显然,在这种情况下,没有真正的收益。
哦,该死,我已经为你完成了大部分的家庭作业。作为报复,并为了让你测试一下关于信号量的知识,你可以实现BlockingQueue合约定义的其他方法之一吗?例如,您可以实现一个offer(E)方法和一个take(E,long,TimeUnit)方法!
祝你好运。