理解Python的HTTP流式传输

8
我在使用Python和Requests访问一个流式API时遇到了困难。
API的说明如下:"我们已经启用了一个流端点来请求报价和交易数据,利用持久的HTTP套接字连接。从API中获得实时数据的方式是进行身份验证的HTTP请求并保持HTTP套接字处于打开状态以不断地接收数据。"
我一直在尝试访问数据的方法如下:
s = requests.Session()
def streaming(symbols):
    url = 'https://stream.tradeking.com/v1/market/quotes.json'
    payload = {'symbols': ','.join(symbols)}
    return s.get(url, params=payload, stream=True)  
r = streaming(['AAPL', 'GOOG'])

请求文档这里显示了两个有趣的事情:使用生成器/迭代器处理传入数据字段的分块数据。对于流式数据,建议使用如下代码:

for line in r.iter_lines():
    print(line)

似乎两种方法都不起作用,虽然我不知道要在生成器函数中放什么,因为示例不清楚。使用r.iter_lines(),我得到输出:"b'{"status":"connected"}{"status":disconnected"}'"。
我可以访问标头,响应是HTTP 200,但无法获得有效数据,也找不到如何在Python中访问流式HTTP数据的明确示例。任何帮助将不胜感激。该API建议使用Jetty for Java来保持流开放,但我不确定如何在Python中实现这一点。
标头:{'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}。

1
将您的URL放入我的浏览器中会返回一个“无效的授权标头”消息。您需要进行身份验证吗?或者,您是否正确地读取了JSON结果? - verbsintransit
1
如果OP看到200 OK,那显然他已经完成了身份验证。 - woozyking
API确实需要认证。为了简单起见,我省略了认证行。我可以发布密钥,但它们与我的交易账户相关联。对于非流请求,我使用Request的.json()。我可能也需要在这里这样做-不确定。 - Turtles Are Cute
1
一开始错过了200响应,我的错。我还想说一个好习惯是通过单独的函数从文件中读取API密钥。这样你就可以复制/粘贴代码片段而不用担心,如果你使用git,那么很容易将该文件包含在.gitignore中。 - verbsintransit
3个回答

12

正如 verbsintransit 所述,您需要解决身份验证问题,但是您的流媒体问题可以通过使用此示例进行修复:

s = requests.Session()

def streaming(symbols):
    payload = {'symbols': ','.join(symbols)}
    headers = {'connection': 'keep-alive', 'content-type': 'application/json', 'x-powered-by': 'Express', 'transfer-encoding': 'chunked'}
    req = requests.Request("GET",'https://stream.tradeking.com/v1/market/quotes.json',
                           headers=headers,
                           params=payload).prepare()

    resp = s.send(req, stream=True)

    for line in resp.iter_lines():
        if line:
            yield line


def read_stream():

    for line in streaming(['AAPL', 'GOOG']):
        print line


read_stream()

if line:条件检查line是否为实际消息或仅为连接保持活动状态。


1
非常感谢!我想我可能需要等到明天股市开盘才能完全测试它。在此期间,我会学习/尝试理解它,并将在明天发布结果。 - Turtles Are Cute
1
这个例子的关键在于 send() 中的 stream=True。如果你不设置它,Requests 将尝试下载整个主体。文档中确实展示了该关键字的使用。 - Lukasa
我得到的错误如下:ConnectionError: ('连接中止。', RemoteDisconnected('远程端关闭了连接而没有响应',)) - Kush Patel
我真的很想让这种方法起作用。我遇到了错误,不清楚如何进行调试:urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer')) - ziff

3

不确定您是否已经解决了这个问题,但TradeKing在其JSON块之间不会放置新行符号。因此,您必须使用iter_content逐字节获取它,将该字节附加到缓冲区,尝试解码缓冲区,在成功时清除缓冲区并生成结果对象。


1
很高兴那对你有用!我现在所做的是使用NodeJS来监听流并通过ZeroMQ接口向Python监听器输出对象,然后由它们进行操作。当处理大量符号时,Python会变得缓慢。 - krillr
krillr,能否请您发布一个指向与您的Node和ZeroMQ解决方案相关的有用信息的URL。非常感谢。 - ziff

1
import requests
from requests_oauthlib import OAuth1


def streaming(symbols):
    consumer_key     = '***'
    consumer_secret  = '***'
    access_token     = '***'
    access_secret    = '***'

    auth = OAuth1(consumer_key,
        client_secret = consumer_secret,
        resource_owner_key = access_token,
        resource_owner_secret = access_secret)
        
    payload = {'symbols': ','.join(symbols)}
    resp = requests.Session().request("GET",'https://stream.tradeking.com/v1/market/quotes.json',stream=True,auth=auth,params=payload)
    # resp.raise_for_status()
    
    for chunk in resp.iter_content(chunk_size=1):
        if chunk:
            yield chunk.decode('utf8')

#try this
for line in streaming(['AAPL', 'GOOG']):
    print(line)

2
欢迎来到StackOverflow。虽然这段代码可能解决了问题,但是包括解释它如何以及为什么解决了问题将有助于提高您的帖子质量,并可能导致更多的赞。请记住,您正在回答未来读者的问题,而不仅仅是现在提问的人。请[编辑]您的答案以添加解释并指出适用的限制和假设。 - fcdt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接