如何在asyncio中重新连接一个socket?

12

我希望在app.py中使用asyncio创建两个协议(TcpClient和UdpServer),其中TcpClient将与server.py建立持久连接,而UdpServer则作为UDP服务器:

我需要的是:
a) 两个协议要相互通信:调用彼此的方法。这只在第一次连接时有效。如果TcpClient重新连接,它无法再次发送来自UdpServer的字符串"send to tcp."。我用print(self)检查过,TcpClient会创建一个新实例,旧实例仍然存在但没有连接,但我不知道如何改进。我认为我可能在错误地使用asyncio。
b) 当TcpClient断开与server.py的连接时,等待5秒钟,然后再次尝试重新连接,以此类推。我尝试使用asyncio的call_later()来实现,但我认为有一种本地方法可以实现,而不是一种人为的方法。
c) 当我启动app.py并且TcpClient无法连接时,我希望等待5秒钟后再次尝试重新连接,以此类推。我不知道如何做到这一点。

这里是我的app.py和server.py的测试样例。server.py仅用于测试-它将是另一种语言。

我只是想说我尝试了什么:
1) 当我启动app.py时,如果server.py停止,app.py不会重试。
2) 当app.py连接到server.py并且服务器关闭并迅速启动时,TcpClient重新连接,但我不能再连接到新实例并将字符串"send to tcp."发送到server.py的每个其他方法,只有旧实例可以做到这一点,而它已经没有连接了。
3) 如果我使用asyncio.async()替代run_until_complete(),我不能从每个其他协议调用方法。

我在此处放置了app.py和server.py,以便您可以复制并运行测试。

我使用ncat localhost 9000 -u -v发送字符串"send to tcp。"。需要将此字符串打印在UdpServer类中,并通过TcpClient类的send_data_to_tcp方法传递给此方法,该方法将字符串发送到server.py。<- 在tcpClient第一次重新连接后就不起作用了。

我正在使用Python 3.4.0。

非常感谢。

app.py:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        #client_tcp = loop.run_until_complete(coro)
        client_tcp = asyncio.async(coro)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = asyncio.get_event_loop().call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
#server_udp = asyncio.Task(coro)
server_udp = loop.run_until_complete(coro)


#TCP client
coro = loop.create_connection(TcpClient, 'localhost', 8000)
#client_tcp = asyncio.async(coro)
client_tcp = loop.run_until_complete(coro)

loop.run_forever()

server.py:

import asyncio

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
        self.transport.write(data)

        # close the socket
        #self.transport.close()

    #def connection_lost(self):
    #    print('server closed the connection')



loop = asyncio.get_event_loop()
coro = loop.create_server(EchoServer, 'localhost', 8000)
server = loop.run_until_complete(coro)
print(server)
print(dir(server))
print(dir(server.sockets))

print('serving on {}'.format(server.sockets[0].getsockname()))

try:
    loop.run_forever()
except KeyboardInterrupt:
    print("exit")
finally:
    server.close()
    loop.close()
1个回答

14

你只需要一些小修正。首先,我编写了一个协程来处理连接重试:

@asyncio.coroutine
def do_connect():
    global tcp_server  # Make sure we use the global tcp_server
    while True:
        try:
            tcp_server = yield from loop.create_connection(TcpClient, 
                                                           'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(5)
        else:
            break

然后我们使用这个来启动所有内容:

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000)) 
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

下一部分是在启动app.py后处理服务器宕机/重新连接。我们需要修复tcp_client_disconnected和connect_client_tcp以正确处理这个问题:
def connect_client_tcp(self):
    global client_tcp
    task = asyncio.async(do_connect())
    def cb(result):
        client_tcp = result
    task.add_done_callback(cb)

def tcp_client_disconnected(self, data, info):
    print(data)
    self.client_tcp_info = info
    self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

有趣的部分是connect_client_tcp。你原来的版本有两个问题:
  1. 你将client_tcp直接赋值给asyncio.async(coro)的结果,这意味着client_tcp被赋值为一个asyncio.Task。这不是你想要的; 你希望client_tcp被赋值为已完成的asyncio.Task的结果。我们通过使用task.add_done_callback来在完成后将client_tcp赋值为Task的结果。

  2. 你忘记了在方法顶部添加global client_tcp。没有那个,你只是创建了一个名为client_tcp的本地变量,在connect_client_tcp结束时就会被丢弃。

一旦解决了这些问题,我就能运行app.py,随时启动/停止serv.py,但当所有三个组件一起运行时,总是能看到从ncat成功传递所有消息到serv.py

下面是完整的app.py代码,方便复制粘贴:

import asyncio

#TCP client
class TcpClient(asyncio.Protocol):
    message = 'Testing'

    def connection_made(self, transport):
        self.transport = transport
        self.transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
        server_udp[1].tcp_client_connected()


    def data_received(self, data):
        self.data = format(data.decode())
        print('data received: {}'.format(data.decode()))
        if self.data == 'Testing':
            server_udp[1].send_data_to_udp(self.data)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        msg = 'Connection lost with the server...'
        info = self.transport.get_extra_info('peername')
        server_udp[1].tcp_client_disconnected(msg, info)


#UDP Server
class UdpServer(asyncio.DatagramProtocol):

    CLIENT_TCP_TIMEOUT = 5.0

    def __init__(self):
        self.client_tcp_timeout = None

    def connection_made(self, transport):
        print('start', transport)
        self.transport = transport

    def datagram_received(self, data, addr):
        self.data = data.strip()
        self.data = self.data.decode()
        print('Data received:', self.data, addr)
        if self.data == 'send to tcp.':
            client_tcp[1].send_data_to_tcp(self.data)

    def connection_lost(self, exc):
        print('stop', exc)

    def send_data_to_udp(self, data):
        print('Receiving on UDPServer Class: ', (data))

    def connect_client_tcp(self):
        global client_tcp
        coro = loop.create_connection(TcpClient, 'localhost', 8000)
        task = asyncio.async(do_connect())
        def cb(result):
            client_tcp = result
        task.add_done_callback(cb)

    def tcp_client_disconnected(self, data, info):
        print(data)
        self.client_tcp_info = info
        self.client_tcp_timeout = loop.call_later(self.CLIENT_TCP_TIMEOUT, self.connect_client_tcp)

    def tcp_client_connected(self):
        if self.client_tcp_timeout:
            self.client_tcp_timeout.cancel()
            print('call_later cancel.')

@asyncio.coroutine
def do_connect():
    global client_tcp
    while True:
        try:
            client_tcp = yield from loop.create_connection(TcpClient, 'localhost', 8000)
        except OSError:
            print("Server not up retrying in 5 seconds...")
            yield from asyncio.sleep(1)
        else:
            break

loop = asyncio.get_event_loop()

#UDP Server
coro = loop.create_datagram_endpoint(UdpServer, local_addr=('localhost', 9000))
server_udp = loop.run_until_complete(coro)

#TCP client
loop.run_until_complete(do_connect())

loop.run_forever()

非常感谢,完美! :) - Beyonlo Sam
有没有使用新语法的方法来做到这一点?当我这样做时,会出现“生成器不可调用”的错误。 - LeanMan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接