你问题的非常简短的答案:
服务器代码没有向客户端发送SSE消息。
为什么?因为您需要遵循SSE格式。
根据JASON BUTZ在使用Node进行服务器发送事件。
你应该发送一个
Connection
:
keep-alive
头部,以确保客户端也保持连接开放。应该发送一个带有值为
no-cache
的
Cache-Control
头部,以防止数据被缓存。最后,需要将
Content-Type
设置为
text/event-stream
。
完成所有这些操作后,应向客户端发送一个换行符(\n),然后可以发送事件。事件必须作为字符串发送,但是在该字符串中的内容并不重要。JSON字符串非常好。
事件数据必须按照格式 "data: <DATA TO SEND HERE>\n"
发送。
重要的是要注意,在每行的末尾都应该有一个换行符。为了表示事件的结束,还需要添加一个额外的换行符。
多个数据行也是完全可以的。
对你问题的详细回答:
根据 Eric Bidelman在html5rocks.com上的文章:
当使用SSE进行通信时,服务器可以在没有进行初始请求的情况下随时向您的应用程序推送数据。换句话说,更新可以在发生时从服务器流式传输到客户端。
但是,为了实现这一点,客户端必须通过调用SSE API端点(在您的情况下,调用Node.js API代码)来“开始”,并准备接收消息的流。
“开始”是通过调用SSE API端点完成的(在您的情况下,调用Node.js API代码)。
准备工作是准备处理异步消息流。
SSEs在服务器和客户端之间打开一个单向通道。
这意味着服务器与客户端之间有一个“直接”的通道。它不是由其他不是“客户端”代码的某个进程/代码“启动”(打开)的。
根据OP的评论,预期行为(详细版)。
客户端Alice使用参数{name: "Alice"}
调用API端点,没有发生任何可见的事情。
...然后客户端Bob使用参数{name: "Bob"}
调用API端点,客户端Alice会收到有效载荷为{name: "Bob", says: "Hi"}
的SSE。
...然后客户端Carol使用参数{name: "Carol"}
调用API端点,客户端Alice和Bob各自会接收到有效载荷为{name: "Carol", says: "Hi"}
的SSE。
...以此类推。每次新的客户端使用参数调用API端点时,每个其他已打开通道的客户端都将接收到新的“Hi”有效载荷的SSE。
...然后客户端Bob从服务器“断开连接”,客户端Alice、客户端Carol和所有具有“打开”通道的客户端都将接收到有效载荷为{name: "Bob", says: "Bye"}
的SSE。
...以此类推。每当旧客户端从服务器“断开连接”时,每个其他已打开通道的客户端都将接收到新的“Bye”有效载荷的SSE。
抽象行为
- 每当一个新客户端请求“打开”一个通道并发送一些参数或旧客户端从服务器“断开”连接时,它们会在服务器上引发一个事件。
- 每次这样的事件在服务器上发生时,服务器都会向所有“打开”通道发送一个带有参数和消息作为有效负载的SSE消息。
关于阻塞的说明 每个具有“打开”通道的客户端都将被“卡住”在一个无限等待事件发生的循环中。客户端设计需要使用“线程”代码技术来避免阻塞。
代码
你的Python客户端应该“请求”开始单向通道并保持等待,直到通道关闭。不应该结束并重新开始使用不同的通道。它应该保持相同的通道开放。
从网络角度来看,它将像一个“长”响应,不会结束(直到SSE消息结束)。响应只是“不断地到来”。
你的Python客户端代码就是这样做的。我注意到它是从sseclient-py library使用的完全相同的示例代码。
Python 3.4客户端代码
要包含要发送到服务器的参数,请使用Requests
库中的一些代码docs/#passing-parameters-in-urls。
因此,将这些示例混合在一起,我们最终得到以下代码作为您的Python 3.4客户端:
import json
import pprint
import requests
import sseclient
input_json = {'name':'Alice'}
url = 'http://company.com/api/root_event_notification'
stream_response = requests.get(url, params=input_json, stream=True)
client = sseclient.SSEClient(stream_response)
for event in client.events():
print ("got a new event from server")
pprint.pprint(event.data)
Python 2.7的客户端代码
如果想要将参数发送到服务器,请使用urllib.urlencode()
库将它们编码为查询参数并附加在URL中。
使用urllib3.PoolManager().request()
进行HTTP请求,这样您将得到一个流响应。
请注意,sseclient
库将事件数据作为Unicode字符串返回。要将JSON对象转换回Python对象(带有Python字符串),请使用byteify
,这是一个递归自定义函数(感谢Mark Amery)。
请使用以下代码作为Python 2.7客户端:
import json
import pprint
import urllib
import urllib3
import sseclient
def byteify(input):
if isinstance(input, dict):
return {byteify(key): byteify(value)
for key, value in input.iteritems()}
elif isinstance(input, list):
return [byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
input_json = {'name':'Alice'}
base_url = 'http://localhost:3000/api/root_event_notification'
url = base_url + '?' + urllib.urlencode(input_json)
http = urllib3.PoolManager()
stream_response = http.request('GET', url, preload_content=False)
client = sseclient.SSEClient(stream_response)
for event in client.events():
print ("got a new event from server")
pprint.pprint(byteify(json.loads(event.data)))
现在,服务器代码应该:
- 发出内部服务器的“hello”事件,以便其他客户端监听该事件
- “打开”通道
- 注册以监听所有可能发生的内部服务器事件(这意味着保持通道“打开”,在消息之间不发送任何内容,只是保持通道“打开”)。
- 这包括在客户端/网络关闭通道时发出内部服务器的“goodbye”事件,以便其他客户端监听该事件,并最终“结束”。
请使用以下Node.js API代码:
var EventEmitter = require('events').EventEmitter;
var myEmitter = new EventEmitter;
function registerEventHandlers(req, res) {
const myParams = req.query;
const sayHi = function(params) {
params['says'] = "Hi";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}\n\n`);
}
const sayBye = function(params) {
params['says'] = "Bye";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}\n\n`);
}
myEmitter.on('hello', sayHi);
myEmitter.on('goodbye', sayBye);
req.on('close', () => {
myEmitter.emit('goodbye', myParams);
myEmitter.off('hello', sayHi);
myEmitter.off('goodbye', sayBye);
console.log("<- close ", req.query);
});
}
app.get("/api/root_event_notification", (req, res, next) => {
console.log("open -> ", req.query);
myEmitter.emit('hello', req.query);
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
res.write('\n');
registerEventHandlers(req, res);
})
以下是引用自html5rocks.com的内容:
从源发送事件流只需要构建一个明文响应,使用text/event-stream Content-Type提供,并遵循SSE格式。在其基本形式中,响应应包含"data:"行,后跟您的消息,后跟两个"\n"字符以结束流。
在客户端代码中,sseclient-py库负责解释SSE格式,因此每当两个"\n"字符到达时,该库就会“迭代”一个新的“可迭代”对象(一个新事件),该对象具有从服务器发送的消息的data
属性。
这是我测试代码的方法
- 使用Node.js API代码启动服务器
- 运行一个客户端,只取消注释"Alice"这一行(此时在客户端控制台上看不到任何内容)。
- 运行第二个客户端,只取消注释"Bob"这一行。第一个客户端"Alice"的控制台显示:Bob说“嗨”(此时在Bob的客户端控制台上看不到任何内容)。
- 运行第三个客户端,只取消注释"Carol"这一行。Alice和Bob的控制台都显示:Carol说“嗨”(此时在Carol的客户端控制台上看不到任何内容)。
- 停止/关闭Bob的客户端。Alice和Carol的控制台都显示:Bob说“再见”。
所以,代码运行正常 :)
requests.get(url)
时,您的变量url
格式不正确。我看到了Node.js API输出,但不知道它是一个对象、字符串还是其他什么东西。 - elingerojopayloadString {"root_version":"12A12","BATS":"678911"} payloadString {"root_version":"12A12","BATS":"678911"}
。我们应该只收到一个事件,对吗?我们怎样才能停止这个事件轰炸呢? - carte blanchesetInterval()
模拟服务器每秒发生的事件。该代码旨在作为解释说明,而非应用程序的解决方案。回答你的问题_我们只能收到一次事件,对吗?不是。在示例代码中,事件每1秒钟发生一次,因此您应该每秒钟连续收到它们。所以,它正常工作!如果您想要不同的行为,则必须使用其他代码。SSE旨在由客户端请求,但由服务器作为“主人”(客户端作为“木偶”)控制和定时。 - elingerojoPython 2.7
和sseclient-py
进行了测试...和你得到的一模一样的错误和 __traceback__。我刚刚编辑了我的答案,包括适用于Python 2.7
的客户端代码,可以正常工作。 - elingerojo