Servlet-3异步上下文,如何进行异步写入?

51

问题描述

Servlet-3.0 API允许分离请求/响应上下文并稍后回答。

但是如果我尝试写入大量数据,例如:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

对于Tomcat 7和Jetty 8,它实际上可能会阻塞 - 且在简单的测试用例中确实会阻塞。教程建议创建一个线程池来处理这样的设置 - 这通常是传统10K架构的反面。

然而,如果我有10,000个打开的连接和一个线程池,例如10个线程,即使1%的客户端具有低速连接或仅被阻止连接,也足以阻止线程池并完全阻止彗星响应或显着减慢它。

期望的做法是获得“可写就绪”通知或I/O完成通知,然后继续推送数据。

如何使用Servlet-3.0 API完成此操作,即如何获取:

  • 关于 I/O 操作的异步完成通知。
  • 获得非阻塞 I/O 和可写就绪通知。

如果Servlet-3.0 API不支持此功能,则是否有任何Web服务器特定的API(例如Jetty Continuation或Tomcat CometEvent)可以真正异步地处理此类事件,而无需使用线程池来虚拟异步I/O?

有人知道吗?

如果这不可能,您能否引用文档确认它?

在示例代码中演示问题

我附上了下面的代码,模拟事件流。

注意:

  • 它使用ServletOutputStream,该输出流抛出IOException以检测断开连接的客户端
  • 它发送keep-alive消息以确保客户端仍然存在
  • 我创建了一个线程池来“模拟”异步操作。

在这样的示例中,我明确定义了大小为1的线程池以显示问题:

  • 启动应用程序
  • 从两个终端运行curl http://localhost:8080/path/to/app(两次)
  • 现在使用curd -d m=message http://localhost:8080/path/to/app发送数据
  • 两个客户端都收到了数据
  • 现在暂停其中一个客户端(Ctrl+Z)并再次发送消息curd -d m=message http://localhost:8080/path/to/app
  • 观察非挂起的客户端是否未收到任何内容或在消息传输后停止接收keep-alive请求,因为其他线程被阻塞。

我想解决这样的问题,而不使用线程池,因为在1000-5000个打开的连接中,我可以非常快地耗尽线程池。

下面是示例代码。

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

以上示例使用线程来防止阻塞... 但是,如果阻塞客户的数量大于线程池的大小,它将会被阻塞。

如何实现无阻塞?


我也非常想知道这个问题的答案。总的来说,似乎无法获得对底层通道的非阻塞访问,但是通过一些警告,我们可以防止客户端占用线程并对其他客户端造成过于严重的影响。最终,我希望可移植的servlets API能够公开一种正确的非阻塞写入方式,但我怀疑这不会很快实现(他们会说“只需编写bean/app”,而不是使用servlet容器)。如果您的servlet容器友好,我认为我的解决方案基本上可以解决您/我的更有限的问题。 - Nicholas Wilson
你可以在我的Github项目https://github.com/NWilson/oidrelay中看到我实现这个技术的方式。欢迎任何评论!我上周六开始接触Java(之前主要使用Haskell和C),只花了几个晚上的时间。 - Nicholas Wilson
6个回答

30

我发现实现Servlet 3.0Asynchronous API非常棘手,相关文档不够详尽。经过多次试验和尝试许多不同的方法,我最终找到了一个稳健的解决方案,很满意。当我比较我的代码和你的代码时,我注意到一个主要区别,这可能有助于你解决特定的问题。我使用ServletResponse来写入数据,而不是ServletOutputStream

这是我的标准异步Servlet类,稍微适配了一下你的some_big_data情况:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}

2
几个问题:(1)ctx.complete()关闭响应,但我实际上想重用它以后发送更多数据。(2)使用response.getWriter()不好,因为它不会抛出IOException,所以我无法知道例如客户端是否已断开连接;请参见https://dev59.com/JmfWa4cB1Zd3GeqPhXpj;(3)似乎你实际上是使用线程池来“解决”问题,而不是使响应真正异步。 - Artyom
2
从客户端的角度来看,响应确实是异步的。客户端发出请求,未来某个时刻服务器会做出响应。使用HTTP 1.1标准,连接保持活动状态并重复使用多个请求,从而允许这种异步行为。调用ctx.complete()关闭响应,但你只需要在第一次接收数据后直接让客户端再次请求数据即可,如果你真的需要它的话。另请参见:https://dev59.com/_lrUa4cB1Zd3GeqPj3Bt - herrtim
1
你所描述的是长轮询,但我说的是HTTP流式传输,这是允许的。这就是Server-Sent Events的构建方式:http://www.w3.org/TR/eventsource/,也是我想要构建的方式。基本上,我不关闭连接,而是在新事件发生时继续流式传输数据。也就是说,我谈论的是服务器端异步响应处理(而不是客户端)。 - Artyom
2
@Artyom 同意。我所实现的是使用 Servlet 3.0 API 中指定的 Async 特性进行长轮询。我怀疑你不能使用它来完全实现你想要的功能。也许 Tomcat 7.0.27 及更高版本的新 WebSocket 实现是你需要的:http://tomcat.apache.org/tomcat-7.0-doc/web-socket-howto.html - herrtim
1
如果您使用“wss”协议,WebSockets是代理透明的。 - nilskp
显示剩余3条评论

12

3
我们无法完全使写入变成异步的。我们必须实际上接受这样的限制:当我们向客户端输出内容时,我们希望能够及时地完成,并且如果不能完成,我们会将其视为错误。也就是说,如果我们的目标是尽可能快地向客户端流式传输数据,并使用通道的阻塞/非阻塞状态来控制流量,那么我们就没有运气了。但是,如果我们以较低的速率发送数据,而客户端应该能够处理,那么我们至少可以迅速断开不能快速读取的客户端。
例如,在您的应用程序中,我们以较慢的速率(每几秒钟)发送心跳包,并期望客户端能够跟上它们收到的所有事件。我们将数据大量发送给客户端,如果它跟不上,我们可以迅速而干净地断开连接。这比真正的异步I/O要有些局限性,但它应该满足您的需求(顺便说一句,也满足我的需求)。
关键在于,所有抛出IOException异常的输出写入方法实际上都做了更多的工作:在实现中,所有可以被interrupt()的调用都将被包装成类似于以下代码(摘自Jetty 9)的形式:
catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

我还注意到在Jetty 8中,这种情况并不会发生。当出现InterruptedException时,日志会记录下来,并立即重试阻塞循环。因此,如果您想使用这个技巧,就必须确保您的servlet容器表现良好。
也就是说,当慢速客户端导致写入线程阻塞时,我们只需要通过调用线程的interrupt()方法来强制抛出IOException异常。想一想:非阻塞代码本来就会消耗一个处理线程的时间单位来执行,因此使用被中止的阻塞写入(比如在一毫秒后)原则上与此相同。我们仍然只是在线程上消耗了少量的时间,只是效率略低而已。
我修改了您的代码,使得主计时器线程在开始写入之前运行一个作业来限制每次写入的时间,并且如果写入完成得很快,则取消该作业,这应该是可以的。
最后要注意的是,在一个良好实现的servlet容器中,使I/O抛出异常是安全的。如果我们能捕获InterruptedIOException并稍后尝试再次写入,那就太好了。也许我们想为无法跟上完整流的慢速客户端提供事件子集。据我所知,在Jetty中这并不完全安全。如果写入抛出异常,HttpResponse对象的内部状态可能不足以安全地处理后续的写入。除非我错过了某些特定的文档来提供这种保证,否则尝试推动servlet容器采用这种方式可能是不明智的。我认为这个想法是,如果发生IOException异常,连接就应该被关闭。
下面是代码,其中包括一个修改过的RunJob::run()版本,使用了一个简单的例子(实际上,我们会想在这里使用主计时器线程,而不是每次写入都启动一个新的线程,这很愚蠢)。
public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}

1
给downvoter:谢谢,您有什么评论吗?我在这个答案上花了很多心血,为了我的自用,我想知道是否有更好的解决方案。如果您认为有更好的方法,请分享一下。 - Nicholas Wilson


2

(我之前看过这个链接,像很多其他链接一样)那么它是如何异步工作的呢? - Artyom
2
客户端:可以通过WebSockets或长轮询来实现;服务器端:异步的DeferredResult将在非服务器池线程中处理后返回。请查看spring-mvc-chat示例git链接;它非常简洁,应该能够在短时间内告诉您是否符合您的需求。 - JJ Zabkar

-1

我已经快速浏览了您的列表,可能错过了一些重点。线程池的优点是可以在一段时间内在多个任务之间共享线程资源。也许您可以通过间隔不同http连接的keepAlive响应来解决问题,而不是将所有连接都分组在同一时间。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接