SSE和Servlet 3.0

19

我在页面加载时注册了一个典型的SSE:

客户端:

sseTest: function(){

var source = new EventSource('mySSE');
source.onopen = function(event){
console.log("eventsource opened!");
};

source.onmessage = function(event){
var data = event.data;
console.log(data);
document.getElementById('sse').innerHTML+=event.data + "<br />";
};
}

我的 JavaScript 调试器显示 "eventsource opened!" 已成功。

我的服务器代码是一个 Servlet 3.0:

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Random;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(urlPatterns={"/mySSE"}, name = "hello-sse", asyncSupported=true)
public class MyServletSSE extends HttpServlet {

@Override
public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

resp.setContentType("text/event-stream");
resp.setCharacterEncoding("UTF-8");

Random random = new Random();
PrintWriter out = resp.getWriter();

//AsyncContext aCtx = req.startAsync(req, resp);
//ServletRequest sReq = aCtx.getRequest();

String next = "data: " + String.valueOf(random.nextInt(100) + 1) + "\n\n";
//out.print("retry: 600000\n"); //set the timeout to 10 mins in milliseconds
out.write(next);
out.flush();
// do not close the stream as EventSource is listening
//out.close();
//super.doGet(req, resp);
}
}

代码可以工作!客户端代码每3秒触发doGet()方法并检索新数据。

问题:但是,我想知道如何通过使用新的Servlet 3.0 Futures(例如async-support或asyncContext.addListener(asyncListener)等)使这段代码更好?由于我从未关闭流,所以我想知道我的服务器将如何扩展?

理论上,最好的方法是在有新数据时通过服务器端代码显式地触发doGet()方法,这样客户端就不需要每3秒触发客户端端的"onmessage()"方法和服务器端的"doGet()"方法来获取新数据了。


这是我在SO看到的最好的问题之一,虽然我回答了这个问题,但我实际上从中学到了很多东西,特别是关于EventSource! - Eran Medan
如果有1000个客户端,这是否意味着会有1000个连接到服务器? - Harun
2个回答

16

这是一个很棒的问题,这里有一个完整的工作示例(Servlet 3.0 / Java EE 6)

一些注意事项:

  1. 它通过out.checkError()处理“浏览器选项卡/窗口关闭”,也调用flush()
  2. 我写得比较快,所以我相信它可以改进,只是一个POC,在测试之前不要在生产环境中使用

Servlet:(为了简洁起见省略了引入语句,很快会更新一个完整的gist)

@WebServlet(urlPatterns = {"/mySSE"}, asyncSupported = true)
public class MyServletSSE extends HttpServlet {

  private final Queue<AsyncContext> ongoingRequests = new ConcurrentLinkedQueue<>();
  private ScheduledExecutorService service;

  @Override
  public void init(ServletConfig config) throws ServletException {
    final Runnable notifier = new Runnable() {
      @Override
      public void run() {
        final Iterator<AsyncContext> iterator = ongoingRequests.iterator();
        //not using for : in to allow removing items while iterating
        while (iterator.hasNext()) {
          AsyncContext ac = iterator.next();
          Random random = new Random();
          final ServletResponse res = ac.getResponse();
          PrintWriter out;
          try {
            out = res.getWriter();
            String next = "data: " + String.valueOf(random.nextInt(100) + 1) + "num of clients = " + ongoingRequests.size() + "\n\n";
            out.write(next);
            if (out.checkError()) { //checkError calls flush, and flush() does not throw IOException
              iterator.remove();
            }
          } catch (IOException ignored) {
            iterator.remove();
          }
        }
      }
    };
    service = Executors.newScheduledThreadPool(10);
    service.scheduleAtFixedRate(notifier, 1, 1, TimeUnit.SECONDS);
  }

  @Override
  public void doGet(HttpServletRequest req, HttpServletResponse res) {
    res.setContentType("text/event-stream");
    res.setCharacterEncoding("UTF-8");

    final AsyncContext ac = req.startAsync();
    ac.setTimeout(60 * 1000);
    ac.addListener(new AsyncListener() {
      @Override public void onComplete(AsyncEvent event) throws IOException {ongoingRequests.remove(ac);}
      @Override public void onTimeout(AsyncEvent event) throws IOException {ongoingRequests.remove(ac);}
      @Override public void onError(AsyncEvent event) throws IOException {ongoingRequests.remove(ac);}
      @Override public void onStartAsync(AsyncEvent event) throws IOException {}
    });
    ongoingRequests.add(ac);
  }
}

JSP:

<%@page contentType="text/html" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <title>JSP Page</title>
        <script>
            function test() {
                var source = new EventSource('mySSE');
                source.onopen = function(event) {
                    console.log("eventsource opened!");
                };

                source.onmessage = function(event) {
                    var data = event.data;
                    console.log(data);
                    document.getElementById('sse').innerHTML += event.data + "<br />";
                };
            }
            window.addEventListener("load", test);
        </script>
    </head>
    <body>
        <h1>Hello SSE!</h1>
        <div id="sse"></div>
    </body>
</html>

2
一个缓慢的客户端读取会减缓所有其他客户端写入速度,因为这种技术使用阻塞写入。(这是2013年的适当答案)。现代应用程序应该使用Servlet 3.1与异步I/O写入。 - Joakim Erdfelt
你能给出“Servlet 3.1 with Async I/O writes”链接吗? - Harun
2
Joakim:这真的是个问题吗?我刚刚进行了一次测试运行。两个客户端,一个在alert()中阻塞。服务器每30秒向两个客户端写入一条短消息。整夜都没有出现任何问题。也许在链路中有足够的缓冲区,但仍然…… - Per Lindberg

1

有用的例子。

有些人可能会在使用startAsync()时遇到"IllegalStateException: Not supported"错误,此时请不要忘记:

@WebServlet(urlPatterns = "/Sse", asyncSupported=true)

或者使用

request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

来自这篇文章


1
同时在所有适用的过滤器上进行 AND 操作。 - Per Lindberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接