Python - 如何同时等待队列和套接字的数据?

10
我希望能够读取来自Queue.Queue或TCP套接字的消息,无论哪个先到达。 如何在不使用2个线程的情况下实现? 平台是Windows上的CPython 2.7.5。

你不能轻易地做到这一点。像select(2)这样的东西适用于两个阻塞套接字,但不适用于队列。如果可以接受的话,最接近的方法是在循环中使用非阻塞方法。 - Aya
3个回答

15

这里有一个非常好的技巧可以解决你的问题,点击这里查看。

import queue
import socket
import os

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

1
以上链接已失效 - 这是一个新链接:https://books.google.co.uk/books?id=f7wGeA71_eUC&pg=PA532&lpg=PA532&dq=class+PollableQueue(queue.Queue):&source=bl&ots=Xjj50am-Nm&sig=ACfU3U2Haln-bB8QBoKdawco07pm0pIN-w&hl=en&sa=X&ved=2ahUKEwjqvfuw36rmAhXIQkEAHX_GDjAQ6AEwAHoECAoQAQ#v=onepage&q=class%20PollableQueue(queue.Queue)%3A&f=false 解决方案的要点是向队列和套接字对写入单个字节。套接字对的另一端则表示数据已准备好读取,因此您需要同时读取套接字和队列(套接字提供了一个信号量来告诉您读取队列)。 - Ralph Bolton
看起来不错,但在Linux上会出现问题,在没有gets的情况下进行了不超过几百次的put调用。https://stackoverflow.com/questions/72011758/early-hang-up-on-socket-send - Mikhail M

1
要在单个线程中完成此操作,您需要使用非阻塞方法,并将它们合并到单个事件循环中。实际上,我在这里使用的是 select 而不是非阻塞套接字 I/O,因为如果您需要从多个套接字读取数据,它会更加简洁...
import socket
import select
import Queue
import time

TIMEOUT = 0.1   # 100ms


def process_queue_item(item):
    print 'Got queue item: %r' % item


def process_socket_data(data):
    print 'Got socket data: %r' % data


def main():

    # Build queue
    queue = Queue.Queue()
    for i in range(10):
        queue.put(i)
    queue.put(None)   # Using None to indicate no more data on queue
    queue_active = True

    # Build socket
    sock = socket.socket()
    sock.connect(('www.google.com', 80))
    sock.send('GET / HTTP/1.0\r\n\r\n')
    socket_active = True

    # Main event loop
    while 1:

        # If there's nothing to read, bail out
        if not (socket_active or queue_active):
            break

        # By default, sleep at the end of the loop
        do_sleep = True

        # Get data from socket without blocking if possible
        if socket_active:
            r, w, x = select.select([sock], [], [], TIMEOUT)
            if r:
                data = sock.recv(64)
                if not data:    # Hit EOF
                    socket_active = False
                else:
                    do_sleep = False
                    process_socket_data(data)

        # Get item from queue without blocking if possible
        if queue_active:
            try:
                item = queue.get_nowait()
                if item is None:  # Hit end of queue
                    queue_active = False
                else:
                    do_sleep = False
                    process_queue_item(item)
            except Queue.Empty:
                pass

        # If we didn't get anything on this loop, sleep for a bit so we
        # don't max out CPU time
        if do_sleep:
            time.sleep(TIMEOUT)


if __name__ == '__main__':
    main()

输出看起来像...
Got socket data: 'HTTP/1.0 302 Found\r\nLocation: http://www.google.co.uk/\r\nCache-Co'
Got queue item: 0
Got socket data: 'ntrol: private\r\nContent-Type: text/html; charset=UTF-8\r\nSet-Cook'
Got queue item: 1
Got socket data: 'ie: PREF=ID=a192ab09b4c13176:FF=0:TM=1373055330:LM=1373055330:S='
Got queue item: 2
etc.

2
每秒迭代10次左右,并轮询两个...不是很高效,但我想没有其他办法。使用2个线程并将数据推送到单个队列中怎么样?这会是更好的方法吗? - GabiMe
@GabiMe 你可以根据自己的喜好调整 TIMEOUT 值。如果你准备使用多个线程,有很多选择。如果每个线程都能处理自己接收到的数据,那么我会将每个阻塞调用放在它自己的线程中。 - Aya

0
你可以按照以下方式进行操作:
def check_for_message(queue,socket,sock_accept_size=512):
    socket.setblocking(0)
    while True:
        try:
            sock_msg=socket.recv(sock_accept_size)
        except socket.error:
            """Do stuff if there is no message"""
            sock_msg=None
        try:
            que_msg=queue.get()
        except Queue.Empty:
            """Do stuff if there is no message"""
            que_msg=None
        yield (que_msg,sock_msg)

然后您可以使用以下方式进行迭代:

for que_message,sock_message in check_for_message(que_instance,socket_instance):
    print que_message,sock_message

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接