环境设置:
- 在同一台机器上运行多播发送和接收应用程序
我正在集成支持OpenPGM的ZeroMQ多播,但在我的示例代码中遇到了问题。
即接收应用程序中"未收到多播消息"。如果我做错了,请纠正我。同时也找不到关于ZeroMQ PGM多播要求的正确示例。
// ZMQ_pgm_receive.cpp :
//
//Headers
#include "stdafx.h"
#include "zmq.h"
#include <iostream>
#include <windows.h>
std::string fullurl = "pgm://eth0;239.255.0.1:30001";
static int roundtrip_count = 50;
static size_t message_size = 4;
int _tmain(int argc, _TCHAR* argv[])
{
void *ctx = NULL,
*s = NULL;
int con;
int i;
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_SUB);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
con = zmq_bind(socket, fullurl.c_str());
if (con == 0) {
printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_recvmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1;
}
printf("message received\n");
if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n");
return -1;
}
Sleep(1000);
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
/*rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
ctx = NULL;
*/
return 0;
}
// ZMQ_pgm_send.cpp :
//
#include "stdafx.h"
#include "zmq.h"
#include <iostream>
#include <windows.h>
std::string fullurl = "pgm://eth0;239.255.0.1:30001";
static int roundtrip_count = 50;
static size_t message_size = 4;
int _tmain(int argc, _TCHAR* argv[])
{
void *ctx = NULL,
*s = NULL;
int con;
int i;
ctx = zmq_init (1);
if (!ctx) {
printf ("error in zmq_init: %s\n", zmq_strerror (errno));
return -1;
}
s = zmq_socket (ctx, ZMQ_PUB);
if (!s) {
printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
return -1;
}
con = zmq_connect(socket, fullurl.c_str());
if (con == 0) {
printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
return -1;
}
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg,message_size);
if (rc != 0) {
printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
return -1;
}
memset(zmq_msg_data (&msg),'A', message_size );
for (i = 0; i != roundtrip_count; i++) {
rc = zmq_sendmsg (s, &msg, 0);
if (rc < 0) {
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1;
}
}
rc = zmq_msg_close (&msg);
if (rc != 0) {
printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
return -1;
}
rc = zmq_close (s);
if (rc != 0) {
printf ("error in zmq_close: %s\n", zmq_strerror (errno));
return -1;
}
/*rc = zmq_ctx_term (ctx);
if (rc != 0) {
printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
return -1;
}
ctx = NULL;
*/
return 0;
}
如果我做错了,请纠正我。
tcp://
重新测试代码,以确认问题的根本原因? - user3666197