使用asyncio编写服务器和客户端以处理从控制台输入的内容。

11

我有一个基于asyncio的TCP服务器,它接收客户端消息,对其进行操作并发送回文本。服务器在正确接收和发送数据方面工作正常。问题在于客户端无法从服务器获取消息,因为在控制台输入时具有阻塞例程(基本上,data_received方法永远不会执行)。只有退出命令可以正常工作(它关闭了循环)。 如何解决这个问题?以下是服务器和客户端代码。它基本上是EchoClient asyncio版本,并带有一些用于练习的其他管道代码。

# client.py
import abc
import asyncio
import sys

MENU = '''
a) do x
b) do y
c) exit
'''

loop_ = asyncio.get_event_loop()


class XCommand:
    def run(self):
        self.client.send_data_to_tcp('X:')  # to bytes


class YCommand(Command):
    def run(self):
         s = input('Input for Y ###  ')
         self.client.send_data_to_tcp('Y:' + s)


class ExitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('EXIT:')
        print('Goodbye!')
        loop_.close()
        exit()


class CommandFactory:
    _cmds = {'a': ACommand,
         'b': BCommand,
         'c': ExitCommand,
         }

    @classmethod
    def get_cmd(cls, cmd):
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


def show_menu(client):
    print(MENU)
    while True:
        command = input('Insert Command$: ')
        cmd_cls = CommandFactory.get_cmd(command)
        if not cmd_cls:
            print('Unknown: {}'.format(command))
            continue
        cmd_cls(client).run()


class Client(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n{!r}'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def main():

    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_in_executor(None, show_menu(client))  # I've tried this...not working

    loop_.run_forever()
    loop_.close()

if __name__ == '__main__':
    main()


# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB


class ACommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.a()
        if not res:
            return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class BCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.b(param1, param2)
        return 'B Ok!'


class ExitCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        loop.close()
        server.close()
        loop.run_until_complete(server.wait_closed())
        print('Buona giornata!!!')
        sys.exit(0)

class CommandFactory:
    _cmds = {'X': ACommand,
         'Y': BCommand,
         'EXIT': ExitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            param1, param2 = None, None
        else:
            param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, param1, param2


class Server(asyncio.Protocol):
    db_filename = '../data/db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(Server, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # Close the server
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()

更新:解决方案是使用aioconsole包和ainput函数。以下代码使用aioconsole(效果非常好)。

# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB


class Command(metaclass=abc.ABCMeta):
    @abc.abstractclassmethod
    def run(self, a, b, c):
        raise NotImplementedError()


class XCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.x()
        if not res:
            return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class YCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.y(param1)
        return 'Operation Y OK: {}'.format(param1)


class QuitCommand(Command):
    @classmethod
    def run(cls, rubrica_db, param1=None, param2=None):
        return 'Disconnected...'

class CommandFactory:
    _cmds = {'X': XCommand,
         'Y': YCommand,
         'DISCONNECT': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            nome, numero = None, None
        else:
            nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, nome, numero

class Server(asyncio.Protocol):
    db_filename = '../data/exercise.db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

#dummydb.py
class DummyDB:
    def __init__(self, fn):
        self.fn = fn

    def x(self):
        return {'field_a': '55 tt TTYY 3334 gghyyujh',
            'field_b': 'FF hhhnneeekk',
            'field_c': '00993342489048222 news'}

    def y(self, param):
        return param

# client.py
import abc
from asyncio import *
from aioconsole import ainput

MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''

loop_ = get_event_loop()


class Command(metaclass=abc.ABCMeta):
    asyn = False

    def __init__(self, tcp_client):
        self.client = tcp_client

    @abc.abstractmethod
    def run(self):
        raise NotImplementedError()


class ACommand(Command):
    def run(self):
        # send X command to server
        self.client.send_data_to_tcp('X:')


class BCommand(Command):
    asyn = True
    async def run(self):
        s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
        # send Y command to server
        self.client.send_data_to_tcp('Y:' + s)


class QuitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('DISCONNECT:')
        print('Goodbye!!!')
        self.client.disconnect()
        exit()


class CommandFactory:
    _cmds = {'A': ACommand,
         'B': BCommand,
         'C': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        cmd = cmd.strip()
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


class Client(Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def disconnect(self):
        self.loop.stop()

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def menu():
    print(MENU)


async def main():
    menu()
    while True:
        cmd = await ainput('Insert Command >')
        cmd_cls = CommandFactory.get_cmd(cmd)
        if not cmd_cls:
            print('Unknown: {}'.format(cmd))
        elif cmd_cls.asyn:
            await cmd_cls(client).run()
        else:
            cmd_cls(client).run()


if __name__ == '__main__':
    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_until_complete(main())
2个回答

24

您可以考虑使用aioconsole.ainput

from aioconsole import ainput

async def some_coroutine():
    line = await ainput(">>> ")
    [...]

这个项目可以在PyPI上找到。


我已经尝试过了,它非常好用。谢谢!我将发布使用aioconsole的正确代码。 - BangTheBank
@ykaner 可以随时在 问题追踪器 上报告 :) - Vincent

5
另一种方法是直接使用run_in_executor
因此,可以这样做:
from functools import partial
from concurrent.futures.thread import ThreadPoolExecutor

async def f():
    rie = partial(asyncio.get_event_loop().run_in_executor, ThreadPoolExecutor(1))
    while True:
        await rie(input)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接