TomEE在执行过多的@Asynchronous操作时会出现问题

3
我正在使用Apache TomEE 1.5.2 JAX-RS,基本上是开箱即用的,预定义了HSQLDB。
以下是简化的代码。我有一个REST风格的接口来接收信号:
@Stateless
@Path("signal")
public class SignalEndpoint {
    @Inject
    private SignalStore store;

    @POST
    public void post() {
        store.createSignal();
    }
}

当接收到信号时,会触发很多操作。商店将创建一个实体,然后触发一个异步事件。
public class SignalStore {
    @PersistenceContext
    private EntityManager em;

    @EJB
    private EventDispatcher dispatcher;

    @Inject
    private Event<SignalEntity> created;

    public void createSignal() {
        SignalEntity entity = new SignalEntity();
        em.persist(entity);
        dispatcher.fire(created, entity);
    }
}

调度程序非常简单,仅存在于使事件处理变为异步的目的。

@Stateless
public class EventDispatcher {
    @Asynchronous
    public <T> void fire(Event<T> event, T parameter) {
        event.fire(parameter);
    }
}

接收事件是另一回事,它从信号中获取数据,存储它,并触发另一个异步事件:

@Stateless
public class DerivedDataCreator {
    @PersistenceContext
    private EntityManager em;

    @EJB
    private EventDispatcher dispatcher;

    @Inject
    private Event<DerivedDataEntity> created;

    @Asynchronous
    public void onSignalEntityCreated(@Observes SignalEntity signalEntity) {
        DerivedDataEntity entity = new DerivedDataEntity(signalEntity);
        em.persist(entity);
        dispatcher.fire(created, entity);
    }
}

回应这种情况甚至需要创建第三层实体。
总之,我有一个REST调用,它同步创建了一个SignalEntity,该实体异步触发了DerivedDataEntity的创建,后者再异步触发了第三种类型的实体的创建。它们都可以完美地工作,并且存储过程非常解耦。
除了当我在for循环中以大量(例如1000)信号编程方式触发时。根据我的AsynchronousPool大小,在处理信号(相当快)约一半大小的数量后,应用程序会完全冻结长达几分钟。然后它恢复,以相当快的速度处理大约相同数量的信号,然后再次冻结。
我已经在过去的半个小时里尝试了AsynchronousPool设置。例如将其设置为2000,将轻松使我的所有信号立即被处理,而没有任何冻结。但是系统也不健康。触发另外1000个信号,结果它们被正确创建,但派生数据的整个创建从未发生。
现在我完全不知道该怎么做了。当然,我可以摆脱所有异步事件并自己实现某种类型的队列,但我一直认为EE容器的重点是让我摆脱这种乏味的工作。异步EJB事件应该已经带有它们自己的队列机制。一个不应该在队列太满时就冻结的机制。
有什么想法吗?
更新:
我现在尝试使用1.6.0-SNAPSHOT。它的行为有些不同:它仍然无法工作,但我确实得到了一个异常:
Aug 01, 2013 3:12:31 PM org.apache.openejb.core.transaction.EjbTransactionUtil handleSystemException
SEVERE: EjbTransactionUtil.handleSystemException: fail to allocate internal resource to execute the target task
javax.ejb.EJBException: fail to allocate internal resource to execute the target task
    at org.apache.openejb.async.AsynchronousPool.invoke(AsynchronousPool.java:81)
    at org.apache.openejb.core.ivm.EjbObjectProxyHandler.businessMethod(EjbObjectProxyHandler.java:240)
    at org.apache.openejb.core.ivm.EjbObjectProxyHandler._invoke(EjbObjectProxyHandler.java:86)
    at org.apache.openejb.core.ivm.BaseEjbProxyHandler.invoke(BaseEjbProxyHandler.java:303)
    at <<... my code ...>>
    ...
Caused by: java.util.concurrent.RejectedExecutionException: Timeout waiting for executor slot: waited 30 seconds
    at org.apache.openejb.util.executor.OfferRejectedExecutionHandler.rejectedExecution(OfferRejectedExecutionHandler.java:55)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:132)
    at org.apache.openejb.async.AsynchronousPool.invoke(AsynchronousPool.java:75)
    ... 38 more

似乎TomEE不会对操作进行任何排队。如果此时没有空闲线程来处理调用,那就很遗憾了。当然,这不可能是有意的吧?更新2:好吧,我似乎已经找到了一个半解决方案:将AsynchronousPool.QueueSize属性设置为maxint可以解决冻结问题。但问题仍然存在:为什么QueueSize一开始就如此有限,更令人担忧的是:为什么这会阻塞整个应用程序?如果队列已满,则会被阻塞,但只要有任务被取出,另一个任务就应该弹出,对吧?队列似乎被阻塞,直到它再次完全为空为止。更新3:对于任何想尝试的人:http://github.com/JanDoerrenhaus/tomeefreezetestcase。更新4:
事实证明,增加队列大小并不能解决问题,只是延迟了问题。问题依然存在:太多的异步操作同时进行,TomEE会受到严重影响,甚至无法在终止时卸载应用程序。
到目前为止,我的诊断是任务清理工作不正常。我的所有任务都非常小而快速(请参见test case on github)。我已经担心可能是OpenJPA或HSQLDB在太多并发调用时减慢速度,但我注释掉所有em.persist调用后,问题仍然存在。因此,如果我的任务非常小而快速,但仍然能够阻塞TomEE以至于它在30秒内无法获得任何任务(javax.ejb.EJBException: fail to allocate internal resource to execute the target task),我想已完成的任务会滞留,堵塞管道,这就是问题所在。
我该如何解决这个问题?

在JVM冻结之前,您是否已经执行了线程转储? - Muel
我曾经遇到过同样的问题,将事件分派器中的@Asynchronous删除后问题得以解决。我不记得为什么事件分派器也是异步的了。 - wutzebaer
@wutzebaer 嗯,是的,这将解决问题,因为它只是删除了所有线程池和导致问题的一切。想法是处理事件而不阻塞初始请求。 - Jan Dörrenhaus
但观察者函数仍然是异步的吗? - wutzebaer
@wutzebaer 是的,但是如果调度程序方法不是异步的,则触发所有事件处理程序仍将是同步的。可能影响不那么大,同意这一点。无论如何,在我看来,使两者都异步应该可以正常工作,而不会出现任何奇怪的行为。 - Jan Dörrenhaus
2个回答

2

基本上,BlockingQueues使用锁来确保数据的一致性并避免数据丢失,因此在高度并发的环境中,它将拒绝许多任务(您的情况)。

您可以在主干上使用RejectedExecutionHandler实现来重试提供任务。一个实现可以是:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
            for (int i = 0; i < 10; i++) {
                if (executor.getQueue().offer(r)) {
                    return;
                }

                try {
                    Thread.sleep(50);
                } catch (final InterruptedException e) {
                    // no-op
                }
            }
            throw new RejectedExecutionException();
        }
    }

使用随机睡眠(在最小值和最大值之间)可以更好地工作。

基本思路是:如果队列已满,则等待一段短时间以减少并发。


非常感谢您 :-) 像这样的处理程序可能也会被添加到TomEE本身中,不是吗? - Jan Dörrenhaus

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接