在Java EE中手动启动新线程是安全的吗?

47

我无法找到一个明确的答案来确认在会话范围内JSF托管bean中生成线程是否安全。该线程需要调用无状态EJB实例上的方法(该实例被依赖注入到管理的Bean中)。

背景是我们有一个需要长时间生成的报告,由于我们无法更改服务器设置,这导致HTTP请求超时。因此,想法是启动一个新线程让它生成并且在临时存储中保存报告。同时,JSF页面显示进度条,轮询管理的bean直到生成完成,然后发出第二个请求来下载存储的报告。这似乎可以工作,但我想确保我的操作不是一种欺骗行为。


FYI,JSR 236 在 Java EE 7 及更高版本中添加了支持生成线程的功能。请参阅 Chris Ritchie 在类似问题上的此答案 - Basil Bourque
3个回答

53

请查看 EJB 3.1 的 @Asynchronous methods。这正是它们的用途。

以下是使用 OpenEJB 4.0.0-SNAPSHOTs 的简单示例。这里有一个标记为@Asynchronous的方法的@Singletonbean。每当任何人(在本例中为您的 JSF 管理 bean)调用该方法时,它都会立即返回,无论该方法实际花费多长时间。

@Singleton
public class JobProcessor {

    @Asynchronous
    @Lock(READ)
    @AccessTimeout(-1)
    public Future<String> addJob(String jobName) {

        // Pretend this job takes a while
        doSomeHeavyLifting();

        // Return our result
        return new AsyncResult<String>(jobName);
    }

    private void doSomeHeavyLifting() {
        try {
            Thread.sleep(SECONDS.toMillis(10));
        } catch (InterruptedException e) {
            Thread.interrupted();
            throw new IllegalStateException(e);
        }
    }
}
这里有一个小测试用例,连续调用了那个@Asynchronous方法几次。
每次调用都会返回一个Future对象,该对象最初基本上是空的,并且在相关方法调用实际完成时将其值填充为容器。
import javax.ejb.embeddable.EJBContainer;
import javax.naming.Context;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class JobProcessorTest extends TestCase {

    public void test() throws Exception {

        final Context context = EJBContainer.createEJBContainer().getContext();

        final JobProcessor processor = (JobProcessor) context.lookup("java:global/async-methods/JobProcessor");

        final long start = System.nanoTime();

        // Queue up a bunch of work
        final Future<String> red = processor.addJob("red");
        final Future<String> orange = processor.addJob("orange");
        final Future<String> yellow = processor.addJob("yellow");
        final Future<String> green = processor.addJob("green");
        final Future<String> blue = processor.addJob("blue");
        final Future<String> violet = processor.addJob("violet");

        // Wait for the result -- 1 minute worth of work
        assertEquals("blue", blue.get());
        assertEquals("orange", orange.get());
        assertEquals("green", green.get());
        assertEquals("red", red.get());
        assertEquals("yellow", yellow.get());
        assertEquals("violet", violet.get());

        // How long did it take?
        final long total = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start);

        // Execution should be around 9 - 21 seconds
        assertTrue("" + total, total > 9);
        assertTrue("" + total, total < 21);
    }
}

示例源代码

底层使这个工作的原因是:

  • 调用者看到的JobProcessor并不是实际的JobProcessor实例,而是一个子类或代理,拥有所有方法被覆盖。应该异步执行的方法会以不同的方式处理。
  • 对异步方法的调用只会创建一个包装了给定方法和参数的Runnable。这个Runnable会传递给一个Executor,它只是一个连接到线程池的工作队列。
  • 在将工作添加到队列后,代理版本的方法返回与现在等待队列的Runnable相关联的Future的实现。
  • Runnable最终在真正的JobProcessor实例上执行方法时,它将获取返回值并将其设置到Future中,以便调用者可以访问。

重要的是要注意JobProcessor返回的AsyncResult对象并不是调用者持有的相同Future对象。如果真正的JobProcessor可以返回String,并且调用者版本的JobProcessor可以返回Future<String>,那就太好了,但我们没有找到不添加更多复杂性的方式。因此,AsyncResult是一个简单的包装对象。容器将获取String,丢弃AsyncResult,然后将String放入调用者持有的真正的Future中。

要获得进度更新,只需将线程安全对象(如AtomicInteger)传递给@Asynchronous方法,并让bean代码定期更新它的完成百分比。


2
确实是个不错的评论!我们最近开始从JSF后端bean调用@Asynchronous注释的方法,结果非常好。如果代码能够早期启动异步操作,它的执行可以与JSF进行的部分生命周期处理重叠。如果您密集地使用此功能,则有必要了解线程池的大小是多少。在JBoss AS 6.0中,这似乎是10 - Arjan Tijms
有没有像Tomcat这样的非JEE Web服务器的好替代库? - user370382
2
@mxrider 你可以将 OpenEJB 作为 war 文件放入任何 5.5 或更高版本的 Tomcat 中,并获得这一切。同时,还可以查看 Apache TomEE http://openejb.apache.org/3.0/apache-tomee.html - David Blevins
1
@Damian 通常情况下,OpenEJB/TomEE的实现是由ExecutorService上的public <T> Future<T> submit(Callable<T> task)驱动的,它使用FutureTask作为实现。Future.cancel()方法的契约略有不同,因此容器也会用实现了额外部分并最终委托给FutureTaskFuture对象来包装该对象。 - David Blevins
1
很棒的答案。Adam Bien几天前发表了关于这个主题的文章:http://www.adam-bien.com/roller/abien/entry/conveniently_transactionally_and_legally_starting - atamanroman
显示剩余2条评论

48

介绍

在会话范围的托管 bean 中生成线程不一定是一种黑客行为,只要它完成您想要的工作即可。但是自行生成线程需要极其小心。代码不应该编写成这样,即单个用户可以例如在会话中生成无限数量的线程和/或线程即使在会话结束后仍然继续运行。这早晚会导致应用程序崩溃。

代码需要这样编写,以确保例如一个用户永远不能在会话中生成多个后台线程,并且在会话结束时保证可以中断该线程。对于会话中的多个任务,您需要将任务排队。另外,所有这些线程最好由公共线程池提供服务,以便您可以在应用程序级别上限制生成的线程总数。

管理线程是非常微妙的任务,因此最好使用内置工具而不是自己使用new Thread()等方式创建。平均Java EE应用程序服务器提供了一个容器管理的线程池,您可以通过EJB的@Asynchronous@Schedule等方式来利用它。为了独立于容器(即Tomcat友好),您还可以使用Java 1.5的Util Concurrent ExecutorServiceScheduledExecutorService
以下示例假定使用Java EE 6+ with EJB。

在表单提交时启动任务并忘记它

@Named
@RequestScoped // Or @ViewScoped
public class Bean {

    @EJB
    private SomeService someService;

    public void submit() {
        someService.asyncTask();
        // ... (this code will immediately continue without waiting)
    }

}

@Stateless
public class SomeService {

    @Asynchronous
    public void asyncTask() {
        // ...
    }

}

在页面加载时异步获取模型

@Named
@RequestScoped // Or @ViewScoped
public class Bean {

    private Future<List<Entity>> asyncEntities;

    @EJB
    private EntityService entityService;

    @PostConstruct
    public void init() {
        asyncEntities = entityService.asyncList();
        // ... (this code will immediately continue without waiting)
    }

    public List<Entity> getEntities() {
        try {
            return asyncEntities.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FacesException(e);
        } catch (ExecutionException e) {
            throw new FacesException(e);
        }
    }
}

@Stateless
public class EntityService {

    @PersistenceContext
    private EntityManager entityManager;

    @Asynchronous
    public Future<List<Entity>> asyncList() {
        List<Entity> entities = entityManager
            .createQuery("SELECT e FROM Entity e", Entity.class)
            .getResultList();
        return new AsyncResult<>(entities);
    }

}

如果您正在使用JSF实用程序库OmniFaces,则可以更快地完成此操作,如果您使用@Eager注释托管bean。

在应用程序启动时安排后台作业

@Singleton
public class BackgroundJobManager {

    @Schedule(hour="0", minute="0", second="0", persistent=false)
    public void someDailyJob() {
        // ... (runs every start of day)
    }

    @Schedule(hour="*/1", minute="0", second="0", persistent=false)
    public void someHourlyJob() {
        // ... (runs every hour of day)
    }

    @Schedule(hour="*", minute="*/15", second="0", persistent=false)
    public void someQuarterlyJob() {
        // ... (runs every 15th minute of hour)
    }

    @Schedule(hour="*", minute="*", second="*/30", persistent=false)
    public void someHalfminutelyJob() {
        // ... (runs every 30th second of minute)
    }

}

不断在后台更新应用程序范围的模型
@Named
@RequestScoped // Or @ViewScoped
public class Bean {

    @EJB
    private SomeTop100Manager someTop100Manager;

    public List<Some> getSomeTop100() {
        return someTop100Manager.list();
    }

}

@Singleton
@ConcurrencyManagement(BEAN)
public class SomeTop100Manager {

    @PersistenceContext
    private EntityManager entityManager;

    private List<Some> top100;

    @PostConstruct
    @Schedule(hour="*", minute="*/1", second="0", persistent=false)
    public void load() {
        top100 = entityManager
            .createNamedQuery("Some.top100", Some.class)
            .getResultList();
    }

    public List<Some> list() {
        return top100;
    }

}

参见:


2
完全同意,只要极其小心地进行(完美措辞),生成线程是可以的。请注意,我们最终在 EJB 3.1 规范级别上解决了这个需求。请参见我的 @Asynchronous 回答。 - David Blevins
@Asynchronous 真的是 EJB 3.1 中最好的新增功能之一。我希望在 EJB 3.2/Java EE 7 中,能够考虑使用 JDK 7 中的 fork/join 的托管变体。 - Arjan Tijms
2
@BalusC,您能详细说明一些工具/功能,以便在EJB 3.0设置中知道会话何时被销毁并结束线程吗?如果需要,我可以创建一个新问题。 - Adam
1
FYI:Java EE 7及更高版本通过[JSR 236:Java™ EE并发工具](https://jcp.org/en/jsr/detail?id=236)获得了新的线程特性。 - Basil Bourque

-4

我尝试了这个方法,从我的JSF管理的Bean中运行得非常好

ExecutorService executor = Executors.newFixedThreadPool(1);

@EJB
private IMaterialSvc materialSvc;

private void updateMaterial(Material material, String status,  Location position) {

    executor.execute(new Runnable() {
        public void run() {
            synchronized (position) {
                // TODO update material in audit? do we need materials in audit?
                int index = position.getMaterials().indexOf(material);
                Material m = materialSvc.getById(material.getId());
                m.setStatus(status);
                m = materialSvc.update(m);
                if (index != -1) {
                    position.getMaterials().set(index, m);
                }

            }
        }
    });

}

@PreDestroy
public void destory() {
    executor.shutdown();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接