使用Node.js实现数据流传输

44

我想知道是否可以使用Node.js从服务器向客户端流式传输数据。我想向Node.js发送单个AJAX请求,然后保持连接并持续向客户端流式传输数据。客户端将接收此流并持续更新页面。

更新:

关于这个答案的更新 - 我无法使其正常工作。在调用close之前,response.write不会被发送。我已设置一个示例程序来实现此目的:

Node.js:

var sys = require('sys'), 
http = require('http');
http.createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/html'});
    var currentTime = new Date();
    setInterval(function(){
        res.write(
            currentTime.getHours()
            + ':' + 
            currentTime.getMinutes()
            + ':' +
            currentTime.getSeconds()
        );
    },1000);
}).listen(8000);

HTML:

<html>
    <head>
        <title>Testnode</title>
    </head>

    <body>
        <!-- This fields needs to be updated -->
        Server time: <span id="time">&nbsp;</span>

        <!-- import jQuery from google -->
        <script type="text/javascript" src="http://ajax.googleapis.com/ajax/libs/jquery/1.4.2/jquery.min.js"></script>

        <!-- import jQuery -->
        <script type="text/javascript">
            $(document).ready(function(){
            // I call here node.localhost nginx ports this to port 8000
                $('#time').load('http://node.localhost');
            });
        </script>
    </body>
</html>

使用这种方法,直到我调用close()才能得到任何返回。这是可能的吗?还是我应该改用长轮询方式,在其中再次调用加载函数以收到新数据?


我认为这不是流式传输,只是将内容分块。是的,Node会发送答案的一部分,但每个HTML服务器都会这样做。从Node获取的流式传输好处指的是其他方面。 - Zlatko
另外,在你的例子中,你没有暂停、继续和清空方法。读取流是一个会尽可能快地提供数据的东西(或者说尽可能快地按照你的要求进行),当你要求它暂停时,它就会暂停。接着你再让它继续等等。 - Zlatko
4个回答

28

可以实现。只需多次使用response.write()。

var body = ["hello world", "early morning", "richard stallman", "chunky bacon"];
// send headers
response.writeHead(200, {
  "Content-Type": "text/plain"
});

// send data in chunks
for (piece in body) {
    response.write(body[piece], "ascii");
}

// close connection
response.end();

你可能需要每30秒关闭并重新打开连接。

编辑:这是我实际测试的代码:

var sys = require('sys'),
http = require('http');
http.createServer(function (req, res) {
    res.writeHead(200, {'Content-Type': 'text/html'});
    var currentTime = new Date();
    sys.puts('Starting sending time');
    setInterval(function(){
        res.write(
            currentTime.getHours()
            + ':' +
            currentTime.getMinutes()
            + ':' +
            currentTime.getSeconds() + "\n"
        );

        setTimeout(function() {
            res.end();
        }, 10000);

    },1000);
}).listen(8090, '192.168.175.128');

我通过 Telnet 连接到它,发现它确实给出了分块响应。但要在 AJAX 中使用它,浏览器必须支持 XHR.readyState = 3(部分响应)。据我所知,并非所有浏览器都支持此功能。因此,最好使用长轮询(或 Chrome/Firefox 的 Websockets)。

EDIT2: 此外,如果您使用 nginx 作为反向代理到 Node,则有时希望将所有块收集并一次性发送给用户。您需要进行调整。


3
jquery.load 等待整个页面加载完成后才会触发回调函数。你需要使用其他方法。请参考 http://api.jquery.com/load/。 - Kuroki Kaze
1
我会在我的节点机器(0.1.31)上检查它。 - Kuroki Kaze
1
是的,或许使用长轮询会更好。 - Kuroki Kaze
3
为了使这个示例起作用:将“var currentTime = new Date();”移动到setInterval函数中。 - jmav
response.close 是一个函数吗?我认为你的意思是 response.end: http://nodejs.org/api/http.html#http_response_end_data_encoding - Vinay
显示剩余6条评论

20

1
Socket.IO 对此来说效率不高,因为他正在寻找从服务器到客户端的单向流。SSE 就非常适合这种情况。 - Druska
这个解决方案对我帮助很大。我想从客户端流式传输网络摄像头视频和麦克风音频到服务器。我该怎么做?https://dev59.com/2Ibca4cB1Zd3GeqPWXyW - Costa Michailidis

6

您也可以中止无限循环:

app.get('/sse/events', function(req, res) {
    res.header('Content-Type', 'text/event-stream');

    var interval_id = setInterval(function() {
        res.write("some data");
    }, 50);

    req.socket.on('close', function() {
        clearInterval(interval_id);
    }); 
}); 

这是一个expressjs的示例。我相信没有expressjs就像……

2
这在Node.js、pg-query-stream和Materialize的帮助下现在成为可能。
Materialize兼容PostgreSQL,这意味着Node.js应用程序可以使用任何现有的PostgreSQL客户端与Materialize进行交互。
然而,与PostgreSQL不同的是,使用Materialize,您可以从Node.js应用程序中利用增量更新的物化视图,而不是查询Materialize以获取视图在某个时间点的状态,使用TAIL语句请求视图更改的流。
示例应用程序如下所示:
import express from 'express'
import pg from 'pg'
import QStream from 'pg-query-stream'

const app = express();
const port = 3000

app.get('/questions', async (request, response) => {

    const client = new pg.Client('postgres://materialize@SERVER_IP:6875/materialize');

    await client.connect();

    const query = new QStream('TAIL your_materialized_view WITH (PROGRESS)', [], {batchSize: 1});

    const stream = client.query(query);

    response.setHeader('Content-Type',  'text/event-stream');

    for await (const event of stream) {
        if(event.id){
            response.write(`data: ${JSON.stringify(event)}\n`);
        }
    }

})

app.listen(port)

资源:


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接