为什么 Python 客户端无法接收 SSE 事件?

6

我有一个Python客户端,监听使用Node.js API的SSE事件。

流程如下:通过call_notification.py向Node.js API发送事件,并使用run.sh循环运行seevents.py(见下文)。

然而,我没有看到Python客户端接收到这个SSE事件。请问为什么会这样?

call_notification.py

import requests
input_json = {'BATS':'678910','root_version':'12A12'}
url = 'http://company.com/api/root_event_notification?params=%s'%input_json
response = requests.get(url)
print response.text

node.js API

app.get("/api/root_event_notification", (req, res, next) => {
  console.log(req.query.params)
  var events = require('events');
  var eventEmitter = new events.EventEmitter();

  //Create an event handler:
  var myEventHandler = function () {
    console.log('new_root_announced!');
    res.status(200).json({
          message: "New root build released!",
          posts: req.query.params
        });
  }

seevents.py(监听SSE事件的Python客户端)

import json
import pprint
import sseclient

def with_urllib3(url):
    """Get a streaming response for the given event feed using urllib3."""
    import urllib3
    http = urllib3.PoolManager()
    return http.request('GET', url, preload_content=False)

def with_requests(url):
    """Get a streaming response for the given event feed using requests."""
    import requests
    return requests.get(url, stream=True)

url = 'http://company.com/api/root_event_notification'
response = with_urllib3(url)  # or with_requests(url)
client = sseclient.SSEClient(response)
#print client.events()
for event in client.events():
    print "inside"
    pprint.pprint(json.loads(event.data))

run.sh

#!/bin/sh

while [ /usr/bin/true ]
do
    echo "Running sseevents.py"
    python sseevents.py  2>&1 | tee -a sseevents.log.txt

    echo "sleeping for 30 sec"
    sleep 30
done

输出:

Run call_notification.py on Terminal

node.js API OUTPUT

new_root_announced!
{'root_version': 'ABCD', 'BATS': '143'}

./run.sh --> DON'T SEE ABOVE EVENT below
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec
1个回答

7

你问题的非常简短的答案:

服务器代码没有向客户端发送SSE消息。

为什么?因为您需要遵循SSE格式。

根据JASON BUTZ在使用Node进行服务器发送事件

你应该发送一个 Connection: keep-alive 头部,以确保客户端也保持连接开放。应该发送一个带有值为no-cacheCache-Control头部,以防止数据被缓存。最后,需要将 Content-Type设置为 text/event-stream

完成所有这些操作后,应向客户端发送一个换行符(\n),然后可以发送事件。事件必须作为字符串发送,但是在该字符串中的内容并不重要。JSON字符串非常好。

事件数据必须按照格式 "data: <DATA TO SEND HERE>\n" 发送。

重要的是要注意,在每行的末尾都应该有一个换行符。为了表示事件的结束,还需要添加一个额外的换行符。

多个数据行也是完全可以的。

对你问题的详细回答:

根据 Eric Bidelman在html5rocks.com上的文章

当使用SSE进行通信时,服务器可以在没有进行初始请求的情况下随时向您的应用程序推送数据。换句话说,更新可以在发生时从服务器流式传输到客户端。
但是,为了实现这一点,客户端必须通过调用SSE API端点(在您的情况下,调用Node.js API代码)来“开始”,并准备接收消息的流。
“开始”是通过调用SSE API端点完成的(在您的情况下,调用Node.js API代码)。
准备工作是准备处理异步消息流。
SSEs在服务器和客户端之间打开一个单向通道。
这意味着服务器与客户端之间有一个“直接”的通道。它不是由其他不是“客户端”代码的某个进程/代码“启动”(打开)的。
根据OP的评论,预期行为(详细版)。
  1. 客户端Alice使用参数{name: "Alice"}调用API端点,没有发生任何可见的事情。

  2. ...然后客户端Bob使用参数{name: "Bob"}调用API端点,客户端Alice会收到有效载荷为{name: "Bob", says: "Hi"}的SSE。

  3. ...然后客户端Carol使用参数{name: "Carol"}调用API端点,客户端Alice和Bob各自会接收到有效载荷为{name: "Carol", says: "Hi"}的SSE。

  4. ...以此类推。每次新的客户端使用参数调用API端点时,每个其他已打开通道的客户端都将接收到新的“Hi”有效载荷的SSE。

  5. ...然后客户端Bob从服务器“断开连接”,客户端Alice、客户端Carol和所有具有“打开”通道的客户端都将接收到有效载荷为{name: "Bob", says: "Bye"}的SSE。

  6. ...以此类推。每当旧客户端从服务器“断开连接”时,每个其他已打开通道的客户端都将接收到新的“Bye”有效载荷的SSE。

抽象行为

  • 每当一个新客户端请求“打开”一个通道并发送一些参数或旧客户端从服务器“断开”连接时,它们会在服务器上引发一个事件。
  • 每次这样的事件在服务器上发生时,服务器都会向所有“打开”通道发送一个带有参数和消息作为有效负载的SSE消息。

关于阻塞的说明 每个具有“打开”通道的客户端都将被“卡住”在一个无限等待事件发生的循环中。客户端设计需要使用“线程”代码技术来避免阻塞。

代码

你的Python客户端应该“请求”开始单向通道并保持等待,直到通道关闭。不应该结束并重新开始使用不同的通道。它应该保持相同的通道开放。

从网络角度来看,它将像一个“长”响应,不会结束(直到SSE消息结束)。响应只是“不断地到来”。

你的Python客户端代码就是这样做的。我注意到它是从sseclient-py library使用的完全相同的示例代码。

Python 3.4客户端代码

要包含要发送到服务器的参数,请使用Requests库中的一些代码docs/#passing-parameters-in-urls

因此,将这些示例混合在一起,我们最终得到以下代码作为您的Python 3.4客户端:

import json
import pprint
import requests
import sseclient # sseclient-py

# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}

url = 'http://company.com/api/root_event_notification'
stream_response = requests.get(url, params=input_json, stream=True)

client = sseclient.SSEClient(stream_response)

# Loop forever (while connection "open")
for event in client.events():
    print ("got a new event from server")
    pprint.pprint(event.data)

Python 2.7的客户端代码

如果想要将参数发送到服务器,请使用urllib.urlencode()库将它们编码为查询参数并附加在URL中。

使用urllib3.PoolManager().request()进行HTTP请求,这样您将得到一个流响应。

请注意,sseclient库将事件数据作为Unicode字符串返回。要将JSON对象转换回Python对象(带有Python字符串),请使用byteify,这是一个递归自定义函数(感谢Mark Amery)。

请使用以下代码作为Python 2.7客户端:

import json
import pprint
import urllib
import urllib3
import sseclient # sseclient-py

# Function that returns byte strings instead of unicode strings
# Thanks to:
# [Mark Amery](https://stackoverflow.com/users/1709587/mark-amery)
def byteify(input):
    if isinstance(input, dict):
        return {byteify(key): byteify(value)
                for key, value in input.iteritems()}
    elif isinstance(input, list):
        return [byteify(element) for element in input]
    elif isinstance(input, unicode):
        return input.encode('utf-8')
    else:
        return input

# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}

base_url = 'http://localhost:3000/api/root_event_notification'
url = base_url + '?' + urllib.urlencode(input_json)

http = urllib3.PoolManager()
stream_response = http.request('GET', url, preload_content=False)

client = sseclient.SSEClient(stream_response)

# Loop forever (while connection "open")
for event in client.events():
    print ("got a new event from server")
    pprint.pprint(byteify(json.loads(event.data)))

现在,服务器代码应该:
  1. 发出内部服务器的“hello”事件,以便其他客户端监听该事件
  2. “打开”通道
  3. 注册以监听所有可能发生的内部服务器事件(这意味着保持通道“打开”,在消息之间不发送任何内容,只是保持通道“打开”)。
    • 这包括在客户端/网络关闭通道时发出内部服务器的“goodbye”事件,以便其他客户端监听该事件,并最终“结束”。
请使用以下Node.js API代码:
var EventEmitter = require('events').EventEmitter;
var myEmitter = new EventEmitter;


function registerEventHandlers(req, res) {
  // Save received parameters
  const myParams = req.query;

  // Define function that adds "Hi" and send a SSE formated message
  const sayHi = function(params) {
      params['says'] = "Hi";
      let payloadString = JSON.stringify(params);
      res.write(`data: ${payloadString}\n\n`);
  }

  // Define function that adds "Bye" and send a SSE formated message
  const sayBye = function(params) {
      params['says'] = "Bye";
      let payloadString = JSON.stringify(params);
      res.write(`data: ${payloadString}\n\n`);
  }

  // Register what to do when inside-server 'hello' event happens
  myEmitter.on('hello', sayHi);

  // Register what to do when inside-server 'goodbye' event happens
  myEmitter.on('goodbye', sayBye);

  // Register what to do when this channel closes
  req.on('close', () => {
      // Emit a server 'goodbye' event with "saved" params
      myEmitter.emit('goodbye', myParams);

      // Unregister this particular client listener functions
      myEmitter.off('hello', sayHi);
      myEmitter.off('goodbye', sayBye);
      console.log("<- close ", req.query);
  });
}


app.get("/api/root_event_notification", (req, res, next) => {
    console.log("open -> ", req.query);

    // Emit a inside-server 'hello' event with the received params
    myEmitter.emit('hello', req.query);

    // SSE Setup
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
    });
    res.write('\n');

    // Register what to do when possible inside-server events happen
    registerEventHandlers(req, res);

    // Code execution ends here but channel stays open
    // Event handlers will use the open channel when inside-server events happen
})

以下是引用自html5rocks.com的内容:

从源发送事件流只需要构建一个明文响应,使用text/event-stream Content-Type提供,并遵循SSE格式。在其基本形式中,响应应包含"data:"行,后跟您的消息,后跟两个"\n"字符以结束流。

在客户端代码中,sseclient-py库负责解释SSE格式,因此每当两个"\n"字符到达时,该库就会“迭代”一个新的“可迭代”对象(一个新事件),该对象具有从服务器发送的消息的data属性。

这是我测试代码的方法

  1. 使用Node.js API代码启动服务器
  2. 运行一个客户端,只取消注释"Alice"这一行(此时在客户端控制台上看不到任何内容)。
  3. 运行第二个客户端,只取消注释"Bob"这一行。第一个客户端"Alice"的控制台显示:Bob说“嗨”(此时在Bob的客户端控制台上看不到任何内容)。
  4. 运行第三个客户端,只取消注释"Carol"这一行。Alice和Bob的控制台都显示:Carol说“嗨”(此时在Carol的客户端控制台上看不到任何内容)。
  5. 停止/关闭Bob的客户端。Alice和Carol的控制台都显示:Bob说“再见”

所以,代码运行正常 :)


是的。当调用requests.get(url)时,您的变量url格式不正确。我看到了Node.js API输出,但不知道它是一个对象、字符串还是其他什么东西。 - elingerojo
在运行你分享的Python客户端代码后,我不断地收到一连串的事件 payloadString {"root_version":"12A12","BATS":"678911"} payloadString {"root_version":"12A12","BATS":"678911"}。我们应该只收到一个事件,对吗?我们怎样才能停止这个事件轰炸呢? - carte blanche
在我的代码下面注明:这将使用 setInterval() 模拟服务器每秒发生的事件。该代码旨在作为解释说明,而非应用程序的解决方案。回答你的问题_我们只能收到一次事件,对吗?不是。在示例代码中,事件每1秒钟发生一次,因此您应该每秒钟连续收到它们。所以,它正常工作!如果您想要不同的行为,则必须使用其他代码。SSE旨在由客户端请求,但由服务器作为“主人”(客户端作为“木偶”)控制和定时。 - elingerojo
假设您期望的行为,我编辑了我的答案,包括经过测试和工作的代码,它不会每秒模拟事件,而是展示了Python客户端和Node.js API之间SSE的基础知识。 - elingerojo
我的怀疑是正确的。刚刚用 Python 2.7sseclient-py 进行了测试...和你得到的一模一样的错误和 __traceback__。我刚刚编辑了我的答案,包括适用于 Python 2.7 的客户端代码,可以正常工作。 - elingerojo
显示剩余8条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接