如何使用Python Trio与Google Protocol Buffer?

4

我正在尝试使用Python中的protobuf读取一些数据流,并希望使用trio创建用于读取流的客户端。Protobuf具有一些方法调用,但是当我使用trio流时发现它们不起作用。

在Linux机器上的Python客户端。

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body 是一些字节数据,我不知道如何解码。 Dtc.EncodingResponse() 是 protobuf 方法,可以返回一个包含响应的 Dtc 对象,以可读格式呈现。(Dtc是protobuf文件)。但我这里什么也没有得到。当我在没有使用Trio的脚本中运行时,Dtc.EncodingResponse() 可以以可读格式完整地返回响应。

我猜问题可能出在 "client_stream" 是一个Trio流对象,只能读取字节,所以我可能需要使用 ReceiveChannel 对象来代替。但如果是这样,我不知道该怎么做。

更新:Nathaniel J. Smith 给出的下面答案解决了我的问题。

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

我觉得自己很傻,之前没有对数据进行ParseFromString处理,这就是问题所在。非常感谢所有回复我的人,希望这能帮助到其他人。

1
我编辑了这个问题,我得告诉你一些事情:永远不要因为是“新手”而道歉。学习是好的! - M.K
谢谢!非常感谢鼓励! - cloud ostrich
尽管我完全同意@M.K提出的针对新手道歉的观点,但我认为删除以# encoding request开头的代码行的缩进会破坏代码。名称client_stream是在parent协程的本地范围内创建的,但现在在其外部被使用。此外,以parent:开头的print有些暗示这仍应该是coro主体的一部分。您能验证一下吗? - shmee
@shmee,你说得对。我的代码格式很糟糕。我已经编辑了适当的空格。感谢你温暖而温和的建议。 - cloud ostrich
1个回答

3

像评论中的@shmee所说,我认为您的代码已经被编辑器弄乱了...您应该再次检查一下。

在没有使用trio的情况下执行此脚本时,Dtc.EncodingResponse()将以可读格式提供完整响应

我认为您在转换到Trio时可能忽略了一行?Dtc.EncodingResponse()只会创建一个新的空白EncodingResponse对象。如果您想将m_body中的数据解析为您的新对象,则必须明确执行此操作,例如:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

然而,还有另一个问题...这个函数被称为receive_some是因为它只接收了一些字节,但可能并没有接收到你请求的全部字节。你的代码假定单次调用receive_some将获取响应中的所有字节,在进行简单测试时可能是正确的,但通常情况下不能保证。如果第一次调用receive_some收到的数据不足,你可能需要反复调用该函数直至获取所有需要的数据。

实际上,这是非常标准的操作...套接字也是这样工作的。这就是为什么服务器首先发送一个m_size字段 - 这样你就可以确定是否已经收到了所有数据,还是还需继续接收数据!

不幸的是,截至2019年6月,Trio不提供助手来完成这个循环 - 你可以在这个问题上跟踪进展。与此同时,你可以自己编写这样的帮助函数。我认为像这样的代码应该可以工作:

async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp

非常感谢您的回复。将数据从m_body解析到新对象中确实以可读格式给出了数据。我感觉很傻,解决方案如此简单。但无论如何,我仍然非常感激。 - cloud ostrich

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接