使用Java Jeromq实现异步客户端/服务器

3
我正在使用 jeromq0.4.0 版本,尝试使用来自此链接的以下示例,但在该行代码ZMQ.poll(items, 10);上出现编译错误。看起来最近版本的 jeromq 发生了一些更改,但文档和代码还没有更新。请问有谁可以帮我理解如何调整我的下面代码以与最新版本的jeromq一起使用。
    <dependency>
        <groupId>org.zeromq</groupId>
        <artifactId>jeromq</artifactId>
        <version>0.4.0</version>
    </dependency>

以下是代码:
public class asyncsrv {
  // ---------------------------------------------------------------------
  // This is our client task
  // It connects to the server, and then sends a request once per second
  // It collects responses as they arrive, and it prints them out. We will
  // run several client tasks in parallel, each with a different random ID.
  private static Random rand = new Random(System.nanoTime());

  private static class client_task implements Runnable {

    public void run() {
      ZContext ctx = new ZContext();
      Socket client = ctx.createSocket(ZMQ.DEALER);

      // Set random identity to make tracing easier
      String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
      client.setIdentity(identity.getBytes());
      client.connect("tcp://localhost:5570");

      PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

      int requestNbr = 0;
      while (!Thread.currentThread().isInterrupted()) {
        // Tick once per second, pulling in arriving messages
        for (int centitick = 0; centitick < 100; centitick++) {
          // this line is giving compilation error as it says undefined
          ZMQ.poll(items, 10);
          if (items[0].isReadable()) {
            ZMsg msg = ZMsg.recvMsg(client);
            msg.getLast().print(identity);
            msg.destroy();
          }
        }
        client.send(String.format("request #%d", ++requestNbr), 0);
      }
      ctx.destroy();
    }
  }

  // This is our server task.
  // It uses the multithreaded server model to deal requests out to a pool
  // of workers and route replies back to clients. One worker can handle
  // one request at a time but one client can talk to multiple workers at
  // once.
  private static class server_task implements Runnable {
    public void run() {
      ZContext ctx = new ZContext();

      // Frontend socket talks to clients over TCP
      Socket frontend = ctx.createSocket(ZMQ.ROUTER);
      frontend.bind("tcp://*:5570");

      // Backend socket talks to workers over inproc
      Socket backend = ctx.createSocket(ZMQ.DEALER);
      backend.bind("inproc://backend");

      // Launch pool of worker threads, precise number is not critical
      for (int threadNbr = 0; threadNbr < 5; threadNbr++)
        new Thread(new server_worker(ctx)).start();

      // Connect backend to frontend via a proxy
      ZMQ.proxy(frontend, backend, null);

      ctx.destroy();
    }
  }

  // Each worker task works on one request at a time and sends a random number
  // of replies back, with random delays between replies:

  private static class server_worker implements Runnable {
    private ZContext ctx;

    public server_worker(ZContext ctx) {
      this.ctx = ctx;
    }

    public void run() {
      Socket worker = ctx.createSocket(ZMQ.DEALER);
      worker.connect("inproc://backend");

      while (!Thread.currentThread().isInterrupted()) {
        // The DEALER socket gives us the address envelope and message
        ZMsg msg = ZMsg.recvMsg(worker);
        ZFrame address = msg.pop();
        ZFrame content = msg.pop();
        assert (content != null);
        msg.destroy();

        // Send 0..4 replies back
        int replies = rand.nextInt(5);
        for (int reply = 0; reply < replies; reply++) {
          // Sleep for some fraction of a second
          try {
            Thread.sleep(rand.nextInt(1000) + 1);
          } catch (InterruptedException e) {
          }
          address.send(worker, ZFrame.REUSE + ZFrame.MORE);
          content.send(worker, ZFrame.REUSE);
        }
        address.destroy();
        content.destroy();
      }
      ctx.destroy();
    }
  }

  // The main thread simply starts several clients, and a server, and then
  // waits for the server to finish.

  public static void main(String[] args) throws Exception {
    ZContext ctx = new ZContext();
    new Thread(new client_task()).start();
    new Thread(new client_task()).start();
    new Thread(new client_task()).start();
    new Thread(new server_task()).start();

    // Run for 5 seconds then quit
    Thread.sleep(5 * 1000);
    ctx.destroy();
  }
}
1个回答

3

在0.4.0版本中,没有poll方法。但是您可以使用ZPoller代替。

示例:

1 - 您需要创建一个新的poller实例:

ZPoller zPoller = new ZPoller(ctx);
zPoller.register(client, ZMQ.Poller.POLLIN);

2 - 轮询:

zPoller.poll(10);

3 - 并读取socket是否可读:

if (zPoller.isReadable(client)) {
    ZMsg msg = ZMsg.recvMsg(client);
    msg.getLast().print(identity);
    msg.destroy();
}

那么你的代码将会是这样的:
public class asyncsrv {
    // ---------------------------------------------------------------------
    // This is our client task
    // It connects to the server, and then sends a request once per second
    // It collects responses as they arrive, and it prints them out. We will
    // run several client tasks in parallel, each with a different random ID.
    private static Random rand = new Random(System.nanoTime());

    private static class client_task implements Runnable {

        public void run() {
            ZContext ctx = new ZContext();
            ZMQ.Socket client = ctx.createSocket(ZMQ.DEALER);

            // Set random identity to make tracing easier
            String identity = String.format("%04X-%04X", rand.nextInt(), rand.nextInt());
            client.setIdentity(identity.getBytes());
            client.connect("tcp://localhost:5570");

            //ZMQ.PollItem[] items = new ZMQ.PollItem[] {new ZMQ.PollItem(client, ZMQ.Poller.POLLIN)};
            ZPoller zPoller = new ZPoller(ctx);
            zPoller.register(client, ZMQ.Poller.POLLIN);

            int requestNbr = 0;
            while (!Thread.currentThread().isInterrupted()) {
                // Tick once per second, pulling in arriving messages
                for (int centitick = 0; centitick < 100; centitick++) {
                    // this line is giving compilation error as it says undefined
                    //ZMQ.poll(items, 10);
                    zPoller.poll(10);
                    /*if (items[0].isReadable()) {
                        ZMsg msg = ZMsg.recvMsg(client);
                        msg.getLast().print(identity);
                        msg.destroy();
                    }*/
                    if (zPoller.isReadable(client)) {
                        ZMsg msg = ZMsg.recvMsg(client);
                        msg.getLast().print(identity);
                        msg.destroy();
                    }
                }
                client.send(String.format("request #%d", ++requestNbr), 0);
            }
            ctx.destroy();
        }
    }

    // This is our server task.
    // It uses the multithreaded server model to deal requests out to a pool
    // of workers and route replies back to clients. One worker can handle
    // one request at a time but one client can talk to multiple workers at
    // once.
    private static class server_task implements Runnable {
        public void run() {
            ZContext ctx = new ZContext();

            // Frontend socket talks to clients over TCP
            ZMQ.Socket frontend = ctx.createSocket(ZMQ.ROUTER);
            frontend.bind("tcp://*:5570");

            // Backend socket talks to workers over inproc
            ZMQ.Socket backend = ctx.createSocket(ZMQ.DEALER);
            backend.bind("inproc://backend");

            // Launch pool of worker threads, precise number is not critical
            for (int threadNbr = 0; threadNbr < 5; threadNbr++)
                new Thread(new server_worker(ctx)).start();

            // Connect backend to frontend via a proxy
            ZMQ.proxy(frontend, backend, null);

            ctx.destroy();
        }
    }

    // Each worker task works on one request at a time and sends a random number
    // of replies back, with random delays between replies:

    private static class server_worker implements Runnable {
        private ZContext ctx;

        public server_worker(ZContext ctx) {
            this.ctx = ctx;
        }

        public void run() {
            ZMQ.Socket worker = ctx.createSocket(ZMQ.DEALER);
            worker.connect("inproc://backend");

            while (!Thread.currentThread().isInterrupted()) {
                // The DEALER socket gives us the address envelope and message
                ZMsg msg = ZMsg.recvMsg(worker);
                ZFrame address = msg.pop();
                ZFrame content = msg.pop();
                assert (content != null);
                msg.destroy();

                // Send 0..4 replies back
                int replies = rand.nextInt(5);
                for (int reply = 0; reply < replies; reply++) {
                    // Sleep for some fraction of a second
                    try {
                        Thread.sleep(rand.nextInt(1000) + 1);
                    } catch (InterruptedException e) {
                    }
                    address.send(worker, ZFrame.REUSE + ZFrame.MORE);
                    content.send(worker, ZFrame.REUSE);
                }
                address.destroy();
                content.destroy();
            }
            ctx.destroy();
        }
    }

    // The main thread simply starts several clients, and a server, and then
    // waits for the server to finish.

    public static void main(String[] args) throws Exception {
        ZContext ctx = new ZContext();
        new Thread(new client_task()).start();
        new Thread(new client_task()).start();
        new Thread(new client_task()).start();
        new Thread(new server_task()).start();

        // Run for 5 seconds then quit
        Thread.sleep(5 * 1000);
        ctx.destroy();
    }
}

现在明白了。还有,closedestroy之间有什么区别?我应该在zPoller上调用close还是destroy - user1234
@user1234的close方法没有处理IOException,而destroy方法有(实际上它调用了close方法)。另外,这很有趣,但是这两个方法都是无用的,因为我们这里没有任何选择器)) - DontPanic

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接