如何关闭并行流中使用的线程本地自动关闭对象?

6

我有一个ThreadLocal变量,我想像这样使用它:

ThreadLocal<AutoCloseable> threadLocal = new ThreadLocal<AutoCloseable>(); // pseudocode
ForkJoinPool fj = new ForkJoinPool(nThreads);
fj.submit(
    () -> myStream.parallel().forEach(e -> {
        /*I want to use the thread local autocloseable here, 
          but how do I close it when this parallel processing is done?*/
    })
);

我更倾向于在需要执行一个任务依赖于另一个任务已经完成的多线程场景中使用类似CompletableFuture这样的工具。 - UninformedUser
由于每个线程都有自己的实例,因此您可以像串行代码一样使用它。这里没有共享。您对此有问题吗? - Antoniossss
@flakes 我使用一个生产者类来模拟多个生产者,这只是一种模拟,不是真实的使用情况。我想为每个线程创建这样一个生产者。 - pavel_orekhov
@Antoniossss 序列化代码?请解释一下你的意思,我无法在forEach内部关闭这个可自动关闭的对象。 - pavel_orekhov
那是为什么呢? - Antoniossss
显示剩余8条评论
2个回答

8

ThreadLocal会在使用它们的线程结束后关闭。如果您想对此进行控制,需要使用Map代替。

// do our own thread local resources which close when we want.
Map<Thread, Resource> threadLocalMap = new ConcurrentHashMap<>();

fj.submit(
() -> myStream.parallel().forEach(e -> {
     Resource r = threadLocalMap.computeIfAbsent(Thread.currentThread(), t -> new Resource();
    // use the thread local autocloseable here, 
})

// later once all the tasks have finished.
// close all the thread local resources when the parallel processing is done
threadLocalMap.values().forEach(Utils::closeQuietly);

通常会有一种关闭资源而不抛出异常的方法。Chronicle有一个,其他库也有。

public static void closeQuietly(Closeable c) {
    if (c != null) {
       try {
           c.close();
       } catch (IOException ioe) {
           // ignore or trace log it
       }
    }
}

很可能您的项目中已经有一个方法来完成此操作。 https://www.google.co.uk/search?q=public+static+void+closequietly+Closeable


1
@hey_you 我想这只是一个围绕着 AutoCloseable::close 的 try-catch-ignore 包装器。 - flakes
3
你是说 threadLocal.values().forEach(Utils::closeQuietly);。顺便提一下,你可以结合使用 ThreadLocal 来减少 ConcurrentHashMap 的查找次数。使用 Map<Thread, Resource> threadLocal = new ConcurrentHashMap<>(); ThreadLocal<Resource> resources = ThreadLocal.withInitial(() -> threadLocal.computeIfAbsent( Thread.currentThread(), t -> new Resource())); 进行初始化。然后在流操作中,你可以使用简单高效的 Resource r = resources.get();。清理工作保持不变。 - Holger
2
ThreadLocals 的值可能会在某个时候被垃圾回收。但这并不意味着资源被关闭,因为这需要特定的资源类实现清理的 finalizer 或类似的东西(我们都知道,依赖 finalizer 是强烈不建议的)。线程死亡后,垃圾收集器实际上可能会在不可预测的长时间内收集它。更糟糕的是,并行流使用线程池,因此甚至不能保证工作线程在操作后会死亡。 - Holger
1
@hey_you ThreadLocal 实例不依赖于 Closable 接口。您仍然需要手动关闭过期的条目。此外,由于其内部映射实现,从 ThreadLocal 容器中解除引用(并最终 GC)存储的实例可能需要任意长的时间。即使解除引用 ThreadLocal 本身也是如此。如果您想要在资源上调用 close,则可以选择为线程池实例创建自定义线程工厂。请参见此处 - flakes
1
@hey_you 当实现了finalize()方法的Closeables对象不再被引用时,它们会像其他对象一样被清理。对于ThreadLocal对象来说,在线程结束并且不再被引用后会被清理。 - Peter Lawrey
显示剩余11条评论

-1

看起来你想使用某种共享资源。因此,要么不使用ThreadLocal(因为每个线程都将拥有自己的实例(或null)),并等待直到每个任务完成

ForkJoinTask task=fj.submit(
    () -> myStream.parallel().forEach(e -> {
//whatever
    })
);
task.get()///wait - timeout would be good here
resource.close();// close that shared resource - wraping all of that with try-with-resources woudl work as well

.

或者像序列化代码一样使用该资源 - 在forEach中使用并关闭它


1
不好意思,我的意思是我想使用多个共享资源,而不仅仅是一个。我的流中可能有200个项目,我希望有4个资源可以在并行流中处理它们。 - pavel_orekhov
那么你想打开200次你的资源,还是只想使用4个打开的资源(无论是什么),来大致处理每50个项目? - Antoniossss
首先打开这些资源,然后并行处理,最后关闭它们。这有什么问题吗? - Antoniossss

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接