实施WSGI流服务:(如何检测客户端断开连接)

5

我正在编写一个WSGI流服务,利用包含在迭代器中的队列来实现多播推送。以下是该服务的简化模型:

# this is managed by another thread
def processor_runner():
    generator = SerialMessageGenerator()
    for message in generator:
      for client in Processor.connections:
          client.put(message)

# this is managed by twisted's wsgi implementation
def main(environ, start_response):
    queue = Queue()
    Processor.connections.append(queue)
    status = '200 OK'
    response_headers = [
        ('Content-Type', 'application/json'),
        ('Transfer-Encoding', 'chunked')
    ]
    start_response(status, response_headers)
    return iter(queue.get, None)

这在使用Twisted作为WSGI服务器非常成功(顺带一提,序列生成器是一个通过进程间队列与处理器相连的独立进程)。我的问题是我如何检测客户端何时断开连接,从而将其从队列中删除?我的想法是将队列添加为包含客户端套接字的元组,即(socket, queue),然后在执行put之前检查套接字是否仍然连接。然而,我不知道确切需要从“environ”获取什么信息。在我开始凑巧解决问题之前,有没有人有相关经验可以分享一下?

更新

这是我最终采用的解决方案:

class IterableQueue(Queue):

def __init__(self):
    Queue.__init__(self) # Queue is an old style class
    ShellProcessor.connections.append(self)

def __iter__(self):
    return iter(self.get, None)

def close(self):
    self.put(None)
    self.task_done()
    ShellProcessor.connections.remove(self)

如果您有相关经验,也可以随意评论WSGI流服务的架构或性能。 - Bashwork
2个回答

1

当请求完成或中断时,twisted会在迭代器上调用.close()。您可以执行以下操作:

# ...
start_response(status, response_headers)
return ResponseIterator(iter(queue.get, None),
     on_finish=lambda: Processor.connections.remove(queue))

其中ResponseIterator可以是:

class ResponseIterator:

  def __init__(self, iterator, on_finish=None):
      self.iterator = iterator
      self.on_finish = on_finish

  def __iter__(self):
      return self

  def next(self):
      return next(self.iterator)

  def close(self):
      if self.on_finish is not None:
         self.on_finish()

很好,我不知道它在迭代器上被称为close。这太完美了! - Bashwork

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接