你的
socket.recv()
接收到的数据完全符合 ZeroMQ 规范,但它们并没有让你满意,你怀疑为什么你接收到的不是消息的精确副本而是这样的数据。
所以,请耐心阅读下去。
ZeroMQ 最近添加的 STREAM
socket-archetype 很特别
有几年使用 ZeroMQ 信令/消息工具的经验的人会告诉你,最近(v4.x)添加的
STREAM
archetype 不是 ZeroMQ 进程与 ZeroMQ 进程之间通信的最佳选择。
为什么呢?几乎所有 ZeroMQ 工具的精华都必须在
STREAM
中进行快捷操作,以便使 ZeroMQ socket 访问点能够与不知道 ZeroMQ 智能套接字更高级协议的相反套接字端点进程 "交谈"。
原生模式
原生模式用于与 TCP 对等体通信,并允许双向异步请求和回复。
ZMQ_STREAM
当使用 tcp://
传输时,ZMQ_STREAM
类型的套接字用于从非 ØMQ 对等体发送和接收 TCP 数据。 ZMQ_STREAM
套接字可以作为客户端和/或服务器,异步地发送和/或接收 TCP 数据。
当接收到 TCP 数据时,ZMQ_STREAM
套接字应在将消息传递给应用程序之前在消息前添加一个包含源对等体标识的消息部分。从所有连接的对等体中公平排队接收的消息。
当发送 TCP 数据时,ZMQ_STREAM
套接字应删除消息的第一部分,并使用它来确定应路由消息的对等体的标识,而无法路由的消息将导致 EHOSTUNREACH
或 EAGAIN
错误。
要打开与服务器的连接,请使用 zmq_connect()
调用,然后使用 ZMQ_IDENTITY
zmq_getsockopt()
调用获取套接字标识。
要关闭特定的连接,请先发送标识帧,然后再发送零长度消息(请参见示例部分)。
当建立连接时,应用程序将接收到零长度消息。同样,当对等体断开连接(或连接丢失)时,应用程序将接收到零长度消息。
您必须发送一个标识帧,然后是一个数据帧。标识帧需要 ZMQ_SNDMORE
标志,但在数据帧上被忽略。
示例
void *ctx = zmq_ctx_new ();
assert ( ctx );
void *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );
int rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );
uint8_t id [256];
size_t id_size = 256;
uint8_t raw [256];
size_t raw_size = 256;
while ( 1 ) {
id_size = zmq_recv ( socket, id, 256, 0 );
assert ( id_size > 0 );
do {
raw_size = zmq_recv ( socket, raw, 256, 0 );
assert ( raw_size >= 0 );
} while ( raw_size == 256 );
char http_response [] =
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, http_response, strlen ( http_response ), 0 );
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );
如果您遵循多连接套接字情况下
STREAM
行为的描述,发送方将会在连接到多个端点(通过
.connect()
+
.bind()
,
N = < 0, +INF)
)的
socket
实例上收到公平队列轮询读取,但对于通信的对等体数量和性质,没有任何控制,然而在
socket.recv()
中有一个公平排队的轮询机制。这绝对不是一种安全的设计实践。
Summary of ZMQ_STREAM characteristics
Compatible peer sockets none
Direction Bidirectional
Send/receive pattern Unrestricted
Outgoing routing strategy See text ( above )
Incoming routing strategy Fair-queued
Action in mute state EAGAIN