JeroMQ 正确关闭

9
我想知道如何正确关闭JeroMQ,目前我知道三种方法,它们各有优缺点,但我不知道哪一种是最好的。
情况如下:
  • 线程A: 拥有上下文,应提供启动/停止方法
  • 线程B: 实际监听线程

我的当前方法:

线程A

static ZContext CONTEXT = new ZContext();
Thread thread;

public void start() {
    thread = new Thread(new B()).start();
}

public void stop() {
    thread.stopping = true;
    thread.join();
}

线程B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    socket.setReceiveTimeout(10);

    while (!stopping) {
        socket.recv();
    }

    if (NUM_SOCKETS >= 1) {
        CONTEXT.destroySocket(socket);
    } else {
        CONTEXT.destroy();
    }
}

这很不错。对于我来说,10毫秒的关机时间不是问题,但当没有消息接收时,我会不必要地增加CPU负载。目前我更喜欢这个方法。
第二种方法在两个线程之间共享套接字:
线程A
static ZContext CONTEXT = new ZContext();
ZMQ.Socket socket;
Thread thread;

public void start() {
    socket = CONTEXT.createSocket(ROUTER);
    ... // socket setup
    thread = new Thread(new B(socket)).start();
}

public void stop() {
    thread.stopping = true;
    CONTEXT.destroySocket(socket);
}

线程 B

boolean stopping = false;
ZMQ.Socket socket;

public void run() {
    try {
        while (!stopping) {
            socket.recv();
        }
    } catch (ClosedSelection) {
        // socket closed by A
        socket = null;
    }
    if (socket != null) {
        // close socket myself
        if (NUM_SOCKETS >= 1) {
            CONTEXT.destroySocket(socket);
        } else {
            CONTEXT.destroy();
        }
    }
}

效果非常好,但即使recv已经阻塞,有时也不会抛出异常。如果我在启动线程A后等待一毫秒,异常总是会被抛出。我不知道这是一个错误还是我的误用所致,因为我共享了套接字。


"revite"之前就提出过这个问题(https://github.com/zeromq/jeromq/issues/116)并得到了第三种解决方案的答案: https://github.com/zeromq/jeromq/blob/master/src/test/java/guide/interrupt.java

总结: 他们调用ctx.term()和中断在socket.recv()中阻塞的线程。

这很好用,但我不想终止整个上下文,而只想终止这个单一的套接字。我必须针对每个套接字使用一个上下文,因此无法使用inproc。

总结

目前我不知道如何使线程B除了使用超时、共享套接字或终止整个上下文以外的其他方式来退出其阻塞状态。

正确的做法是什么?


这里有一个不同的解决方案,创建一个无法被中断的特殊线程(扩展它),但具有一种通知运行例程关于请求关闭的特殊方法,在关闭钩子中调用此方法... - vach
3个回答

5
经常提到可以销毁zmq上下文,这样共享该上下文的任何内容都将退出,但是这会创建一个噩梦,因为您的退出代码必须尽力避免意外调用已死亡的套接字对象。
试图关闭套接字也行不通,因为它们不是线程安全的,你最终会遇到程序崩溃的问题。
答案:最好的方法是按照ZeroMQ指南建议的方式,使用zmq套接字而不是线程互斥/锁等。设置一个额外的侦听套接字,并在关闭时将其连接并发送一些内容,然后您的run()应该使用JeroMQ轮询器来检查哪个套接字接收到了数据,如果额外的套接字收到了数据,则退出。

3

虽然这个问题比较老,但是还是想提供一下建议...

我建议你查看 ZThread源码。你可以创建一个 IAttachedRunnable 的实例,并将其传递给 fork 方法。这样,你的实例的run方法将被传递一个 PAIR socket 并在另一个线程中执行,而 fork 将返回连接的 PAIR socket 以用于与你的 IAttachedRunnable 获取到的 PAIR socket 进行通信。


2
查看 jeromq 源代码 此处,即使在执行“阻塞”接收时,您仍然会一直消耗 CPU(线程从未休眠)。如果您担心这个问题,请让第二个线程在轮询之间休眠,并让父线程中断。类似以下内容(仅列出相关部分):
线程 A
public void stop() {
    thread.interrupt();
    thread.join();
}

线程B
while (!Thread.interrupted()) {
  socket.recv(); // do whatever
  try {
    Thread.sleep(10); //milliseconds
  } catch (InterruptedException e) {
    break;
  }
}

另外,关于您的第二个解决方案,通常情况下您不应该在线程之间共享套接字 - zeromq指南非常明确地指出了这一点 - “不要在线程之间共享ØMQ套接字。 ØMQ套接字不是线程安全的。”请记住,ZMQ的一个主要用途是IPC - 线程通过连接的套接字进行通信,而不是共享同一个套接字的端口。不需要像共享布尔值停止变量这样的东西。

1
socket.recv() 不应该放在 try 语句块内吗?这是会阻塞的部分。 - Dave Yarwood

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接