为什么 ZeroMQ PGM 多播没有接收到多播消息?(C++,Windows)

4

环境设置:
- 在同一台机器上运行多播发送和接收应用程序

我正在集成支持OpenPGM的ZeroMQ多播,但在我的示例代码中遇到了问题。

即接收应用程序中"未收到多播消息"。如果我做错了,请纠正我。同时也找不到关于ZeroMQ PGM多播要求的正确示例。

// ZMQ_pgm_receive.cpp : 
//
//Headers
#include "stdafx.h"
#include "zmq.h"
#include <iostream>
#include <windows.h>

std::string fullurl = "pgm://eth0;239.255.0.1:30001";
static int roundtrip_count = 50;
static size_t message_size = 4;

int _tmain(int argc, _TCHAR* argv[])
{
    void *ctx = NULL,
         *s = NULL;
    int con;
    int i;

    ctx = zmq_init (1);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_SUB);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }


    con = zmq_bind(socket, fullurl.c_str()); 
    if (con == 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }
    zmq_msg_t msg;

    int rc = zmq_msg_init (&msg);
    if (rc != 0) {
        printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    for (i = 0; i != roundtrip_count; i++) {

        rc = zmq_recvmsg (s, &msg, 0);
        if (rc < 0) {
            printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
            return -1;
        }
        printf("message received\n");

        if (zmq_msg_size (&msg) != message_size) {
            printf ("message of incorrect size received\n");
            return -1;
        }
        Sleep(1000);

    }

    rc = zmq_msg_close (&msg);
    if (rc != 0) {
        printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }
    /*rc = zmq_ctx_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
        return -1;
    }
ctx = NULL;
*/

return 0;
}

// ZMQ_pgm_send.cpp :
//
#include "stdafx.h"
#include "zmq.h"
#include <iostream>
#include <windows.h>

std::string fullurl = "pgm://eth0;239.255.0.1:30001";    
static int roundtrip_count = 50;
static size_t message_size = 4;

int _tmain(int argc, _TCHAR* argv[])
{
    void *ctx = NULL,
         *s = NULL;
    int con;
    int i;

    ctx = zmq_init (1);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_PUB);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    con = zmq_connect(socket, fullurl.c_str()); 
    if (con == 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return -1;
    }
    zmq_msg_t msg;

    int rc = zmq_msg_init_size (&msg,message_size);
    if (rc != 0) {
        printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    memset(zmq_msg_data (&msg),'A', message_size ); 
    for (i = 0; i != roundtrip_count; i++) {

        rc = zmq_sendmsg (s, &msg, 0);
        if (rc < 0) {
            printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
            return -1;
        }
    }

    rc = zmq_msg_close (&msg);
    if (rc != 0) {
        printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }
    /*rc = zmq_ctx_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
        return -1;
    }
    ctx = NULL;
    */

    return 0;
}

如果我做错了,请纠正我。


【步骤0】:您是否介意先使用另一个传输类别 tcp:// 重新测试代码,以确认问题的根本原因? - user3666197
1个回答

2

在评论中提出的[步骤0]已经解决,
现在需要
检测

一个缺失的ZMQ_SUBSCRIBE设置,因此SUB端过滤了所有流量

ZMQ_SUBSCRIBE: 建立消息过滤器


ZMQ_SUBSCRIBE选项将在ZMQ_SUB套接字上建立新的消息过滤器。新创建的ZMQ_SUB套接字将过滤掉所有传入的消息,因此您应该调用此选项来建立初始的消息过滤器。

长度为零的空option_value将订阅所有传入的消息。非空的option_value将订阅以指定前缀开头的所有消息。可以将多个过滤器附加到单个ZMQ_SUB套接字上,在这种情况下,如果消息与至少一个过滤器匹配,则将接受该消息。


无论如何,欢迎并享受这些分布式系统计算的智能工具!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接