Pyzmq - 向STREAM套接字发送消息

4

我想尝试在pyzmq中实现两个流套接字之间的简单连接示例。

sender.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

socket.connect("tcp://localhost:5556")
socket.send("message")

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")

message = socket.recv()
print("Received -> [ %s ]" % (message))

输出

Received [ b'\x00k\x8bEg' ]
Received [ b'' ]

我想问一下,在STREAM套接字之间发送消息的正确方式是什么。


问候布拉迪斯拉发。导航有助于您找到答案吗? StackOverflow告诉用户,好问题和好答案值得反馈和投票,并且最好的答案会被[接受]。所以请编写并点赞:o) - user3666197
谢谢您的回复。很抱歉我还没有时间来表达我的想法,但是您回答了我的问题。可惜您没有解决我的困境 :-D - M.Puk
2个回答

3
你的 socket.recv() 接收到的数据完全符合 ZeroMQ 规范,但它们并没有让你满意,你怀疑为什么你接收到的不是消息的精确副本而是这样的数据。
所以,请耐心阅读下去。

ZeroMQ 最近添加的 STREAM socket-archetype 很特别

有几年使用 ZeroMQ 信令/消息工具的经验的人会告诉你,最近(v4.x)添加的 STREAM archetype 不是 ZeroMQ 进程与 ZeroMQ 进程之间通信的最佳选择。
为什么呢?几乎所有 ZeroMQ 工具的精华都必须在 STREAM 中进行快捷操作,以便使 ZeroMQ socket 访问点能够与不知道 ZeroMQ 智能套接字更高级协议的相反套接字端点进程 "交谈"。

原生模式

原生模式用于与 TCP 对等体通信,并允许双向异步请求和回复。 ZMQ_STREAM

当使用 tcp:// 传输时,ZMQ_STREAM 类型的套接字用于从非 ØMQ 对等体发送和接收 TCP 数据。 ZMQ_STREAM 套接字可以作为客户端和/或服务器,异步地发送和/或接收 TCP 数据。

当接收到 TCP 数据时,ZMQ_STREAM 套接字应在将消息传递给应用程序之前在消息前添加一个包含源对等体标识的消息部分。从所有连接的对等体中公平排队接收的消息。

当发送 TCP 数据时,ZMQ_STREAM 套接字应删除消息的第一部分,并使用它来确定应路由消息的对等体的标识,而无法路由的消息将导致 EHOSTUNREACHEAGAIN 错误。

要打开与服务器的连接,请使用 zmq_connect() 调用,然后使用 ZMQ_IDENTITY zmq_getsockopt() 调用获取套接字标识。

要关闭特定的连接,请先发送标识帧,然后再发送零长度消息(请参见示例部分)。

当建立连接时,应用程序将接收到零长度消息。同样,当对等体断开连接(或连接丢失)时,应用程序将接收到零长度消息。

您必须发送一个标识帧,然后是一个数据帧。标识帧需要 ZMQ_SNDMORE 标志,但在数据帧上被忽略。

示例

void    *ctx = zmq_ctx_new ();
assert ( ctx );
/*                                             Create ZMQ_STREAM socket */
void    *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );

int      rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );

/*                                            Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t  id_size = 256;

/*                                            Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t  raw_size = 256;

while ( 1 ) {
   /*                                         Get HTTP request; ID frame and then request */
   id_size  = zmq_recv ( socket, id, 256, 0 );
   assert ( id_size >  0 );
   do {
        raw_size  = zmq_recv ( socket, raw, 256, 0 );
        assert ( raw_size >= 0 );
   } while (     raw_size == 256 );
   /*                                         Prepares the response */
   char http_response [] =
                            "HTTP/1.0 200 OK\r\n"
                            "Content-Type: text/plain\r\n"
                            "\r\n"
                            "Hello, World!";
   /*                                         Sends the ID frame followed by the response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, http_response, strlen ( http_response ), 0 );

   /*                                         Closes the connection by sending the ID frame followed by a zero response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );

如果您遵循多连接套接字情况下STREAM行为的描述,发送方将会在连接到多个端点(通过.connect() + .bind()N = < 0, +INF))的socket实例上收到公平队列轮询读取,但对于通信的对等体数量和性质,没有任何控制,然而在socket.recv()中有一个公平排队的轮询机制。这绝对不是一种安全的设计实践。
Summary of ZMQ_STREAM characteristics
Compatible peer sockets     none
Direction                   Bidirectional
Send/receive pattern        Unrestricted
Outgoing routing strategy   See text ( above )
Incoming routing strategy   Fair-queued
Action in mute state        EAGAIN

3
这是一个使用pyzmq实现单向连接的简单示例,与问题中的情况相似。
发送方代码如下:
import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.connect('tcp://localhost:5555')
id_sock = socket.getsockopt(zmq.IDENTITY)
socket.send(id_sock, zmq.SNDMORE)
socket.send(b'message')

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.bind('tcp://*:5555')
id_sock = socket.recv()
assert not socket.recv()    # empty data here
assert socket.recv() == id_sock
message = socket.recv()
print('received:', message)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接