我想知道如何正确关闭JeroMQ,目前我知道三种方法,它们各有优缺点,但我不知道哪一种是最好的。
情况如下:
这很不错。对于我来说,10毫秒的关机时间不是问题,但当没有消息接收时,我会不必要地增加CPU负载。目前我更喜欢这个方法。
第二种方法在两个线程之间共享套接字:
线程A
情况如下:
- 线程A: 拥有上下文,应提供启动/停止方法
- 线程B: 实际监听线程
我的当前方法:
线程A
static ZContext CONTEXT = new ZContext();
Thread thread;
public void start() {
thread = new Thread(new B()).start();
}
public void stop() {
thread.stopping = true;
thread.join();
}
线程B
boolean stopping = false;
ZMQ.Socket socket;
public void run() {
socket = CONTEXT.createSocket(ROUTER);
... // socket setup
socket.setReceiveTimeout(10);
while (!stopping) {
socket.recv();
}
if (NUM_SOCKETS >= 1) {
CONTEXT.destroySocket(socket);
} else {
CONTEXT.destroy();
}
}
这很不错。对于我来说,10毫秒的关机时间不是问题,但当没有消息接收时,我会不必要地增加CPU负载。目前我更喜欢这个方法。
第二种方法在两个线程之间共享套接字:
线程A
static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;
public void start() {
socket = CONTEXT.createSocket(ROUTER);
... // socket setup
thread = new Thread(new B(socket)).start();
}
public void stop() {
thread.stopping = true;
CONTEXT.destroySocket(socket);
}
线程 B
boolean stopping = false;
ZMQ.Socket socket;
public void run() {
try {
while (!stopping) {
socket.recv();
}
} catch (ClosedSelection) {
// socket closed by A
socket = null;
}
if (socket != null) {
// close socket myself
if (NUM_SOCKETS >= 1) {
CONTEXT.destroySocket(socket);
} else {
CONTEXT.destroy();
}
}
}
效果非常好,但即使recv
已经阻塞,有时也不会抛出异常。如果我在启动线程A后等待一毫秒,异常总是会被抛出。我不知道这是一个错误还是我的误用所致,因为我共享了套接字。
"revite"之前就提出过这个问题(https://github.com/zeromq/jeromq/issues/116)并得到了第三种解决方案的答案: https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/interrupt.java
总结:
他们调用ctx.term()
和中断在socket.recv()
中阻塞的线程。
这很好用,但我不想终止整个上下文,而只想终止这个单一的套接字。我必须针对每个套接字使用一个上下文,因此无法使用inproc。
总结
目前我不知道如何使线程B除了使用超时、共享套接字或终止整个上下文以外的其他方式来退出其阻塞状态。
正确的做法是什么?