如何限制Spring状态机实例使用的线程数量?

4

我有一个包含n个消息监听器的事件队列。当一条消息到达时,一个消息监听器会接收它,并从相应的池中获取状态机实例,设置状态机上下文并启动它。 这个过程非常完美,但是我在线程方面遇到了问题。由于状态机实例受其池大小限制,我认为总线程数量应该有一个限制,但我错了:

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'consume' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:395) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:848) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) [spring-rabbit-1.7.4.RELEASE.jar!/:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method) [na:1.8.0_152]
at java.lang.Thread.start(Thread.java:717) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603) [na:1.8.0_152]
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) ~[na:1.8.0_152]
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[na:1.8.0_152]
at java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729) ~[na:1.8.0_152]
at org.springframework.scheduling.concurrent.ConcurrentTaskScheduler.schedule(ConcurrentTaskScheduler.java:182) ~[spring-context-4.3.11.RELEASE.jar!/:4.3.11.RELEASE]
at org.springframework.statemachine.state.AbstractState.scheduleAction(AbstractState.java:415) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.state.AbstractState.scheduleStateActions(AbstractState.java:377) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.state.AbstractState.entry(AbstractState.java:208) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.state.ObjectState.entry(ObjectState.java:156) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.entryToState(AbstractStateMachine.java:1216) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.entryToState(AbstractStateMachine.java:1161) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.setCurrentStateInternal(AbstractStateMachine.java:971) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.setCurrentState(AbstractStateMachine.java:949) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.switchToState(AbstractStateMachine.java:841) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.access$400(AbstractStateMachine.java:77) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine$2.transit(AbstractStateMachine.java:301) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor.handleTriggerTrans(DefaultStateMachineExecutor.java:248) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor.processTriggerQueue(DefaultStateMachineExecutor.java:395) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor.access$100(DefaultStateMachineExecutor.java:61) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor$1.run(DefaultStateMachineExecutor.java:281) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-4.3.11.RELEASE.jar!/:4.3.11.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor.scheduleEventQueueProcessing(DefaultStateMachineExecutor.java:300) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.DefaultStateMachineExecutor.execute(DefaultStateMachineExecutor.java:144) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.sendEventInternal(AbstractStateMachine.java:559) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.sendEvent(AbstractStateMachine.java:211) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at org.springframework.statemachine.support.AbstractStateMachine.sendEvent(AbstractStateMachine.java:223) ~[spring-statemachine-core-1.2.6.RELEASE.jar!/:1.2.6.RELEASE]
at io.botbit.backend.consumers.EventConsumer.consume(EventConsumer.java:39) ~[classes!/:0.0.1-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor74.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_152]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_152]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:269) ~[spring-core-4.3.11.RELEASE.jar!/:4.3.11.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:387) ~[spring-rabbit-1.7.4.RELEASE.jar!/:na]
... 14 common frames omitted

分析堆栈跟踪,原因是:
Caused by: java.lang.OutOfMemoryError: unable to create new native thread

我该如何限制Spring状态机实例使用的线程数量?

提前感谢您!

更新1

我已经按照@SpaceTrucker的建议进行了操作:

@Bean(name = "stateMachineTaskScheduler")
public ConcurrentTaskScheduler stateMachineTaskScheduler() {
    ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(10);
    ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler(threadPool);
    return taskScheduler;
}

然后

builder.configureConfiguration().withConfiguration().taskScheduler(stateMachineTaskScheduler());

现在状态机正在使用该池。问题在于所有状态机实例都使用此池的同一实例,而不是每个状态机实例都拥有自己的taskScheduler实例。


但是如果每个状态机都使用自己的调度器,那么你不会最终面临你试图解决的相同问题吗 - 太多线程? - SpaceTrucker
状态机实例是受限的,因为它们被池化了。如果每个机器都有一个带有有限线程池的任务调度程序,那么我认为线程总数是有限的,因为没有人会按需创建线程。我错了吗? - Alejandro Raiczyk
1个回答

3

Spring文档提供了以下示例配置:(链接)

@Configuration
@EnableStateMachine
public class Config17
        extends EnumStateMachineConfigurerAdapter<States, Events> {

    @Override
    public void configure(StateMachineConfigurationConfigurer<States, Events> config)
            throws Exception {
        config
            .withConfiguration()
                .autoStartup(true)
                .beanFactory(new StaticListableBeanFactory())
                .taskExecutor(new SyncTaskExecutor())
                .taskScheduler(new ConcurrentTaskScheduler())
                .listener(new StateMachineListenerAdapter<States, Events>());
    }
}

您只需要用符合您需求的配置的bean替换taskScheduler即可。例如,使用具有限制线程数的scheduledExecutor的另一个ConcurrentTaskScheduler实例。


谢谢您的快速回复!我按照您建议的做了,详见原帖中的#更新1。 - Alejandro Raiczyk

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接