如何在Python中测试服务器发送事件(Server Sent Event)API响应?

4
我目前正在使用以下代码订阅服务器端事件API:
import requests
import sseclient
import json
import pprint

def get_recent_change():
    url = 'https://stream.wikimedia.org/v2/stream/recentchange'
    response = requests.get(url, stream=True)
    client = sseclient.SSEClient(response)
    for event in client.events():
        pprint.pprint(json.loads(event.data))
    return

get_recent_change()

这个是可行的!输出以下数据流(只展示一个条目):

{'bot': True,
 'comment': 'fixing url',
 'id': 998191574,
 'length': {'new': 664, 'old': 662},
 'meta': {'domain': 'commons.wikimedia.org',
          'dt': '2017-11-07T07:24:14+00:00',
          'id': 'a8e00e00-c38c-11e7-b9f1-b083fecef9b0',
          'offset': 526240012,
          'partition': 0,
          'request_id': 'e9052543-9df8-4ff6-a94e-06629f9aef0e',
          'schema_uri': 'mediawiki/recentchange/1',
          'topic': 'eqiad.mediawiki.recentchange',
          'uri': 'https://commons.wikimedia.org/wiki/File:St_Cuthbert%27s_High_School_-_geograph.org.uk_-_71872.jpg'},
 'minor': True,
 'namespace': 6,
 'parsedcomment': 'fixing url',
 'patrolled': True,
 'revision': {'new': 266424348, 'old': 155939348},
 'server_name': 'commons.wikimedia.org',
 'server_script_path': '/w',
 'server_url': 'https://commons.wikimedia.org',
 'timestamp': 1510039454,
 'title': "File:St Cuthbert's High School - geograph.org.uk - 71872.jpg",
 'type': 'edit',
 'user': 'HiW-Bot',
 'wiki': 'commonswiki'}

我该如何为get_recent_change()编写测试?我尝试的所有方法都会卡住,因为流永远没有关闭...你有什么想法吗?
1个回答

0
我找到了一种解决方法。我知道这不是最好的方案,但它有效! 在这种情况下,我只下载了前50个有效载荷 以下是我的代码:
def get_recent_change(self) -> list:
    # se inicializa un socket
    r = requests.session()
    # se establece la conexión
    responds = r.get(self.__url, stream=True)
    try:
        # se capturan los eventos
        client = sseclient.SSEClient(responds)
        print('Connection established')
    except Exception as e:
        print(e)
    # limitador del stram
    cont = 0
    payloads = []
    try:
        print('Downloading info')
        for event in client.events():
            #el primer evento no sirve
            if cont==0:
                pass
            # captura muchos payloads en un solo evento, tocó separarlos
            for st in event.data.split('\n'):
                # Progress bar of the poor
                print('-', end='',flush=True)
                #se guardan en una lista
                payloads.append(json.loads(st))
            if cont >= 50:
                break
            cont +=1
    except Exception as e:
        print(e)
    return payloads

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接