Java 8并行流和ThreadLocal

17

我正在努力想出如何在Java 8并行流中复制ThreadLocal的值。

因此,如果我们考虑这个:

    public class ThreadLocalTest {

        public static void main(String[] args)  {
            ThreadContext.set("MAIN");
            System.out.printf("Main Thread: %s\n", ThreadContext.get());

            IntStream.range(0,8).boxed().parallel().forEach(n -> {
                System.out.printf("Parallel Consumer - %d: %s\n", n, ThreadContext.get());
            });
        }

        private static class ThreadContext {
            private static ThreadLocal<String> val = ThreadLocal.withInitial(() -> "empty");

            public ThreadContext() {
            }

            public static String get() {
                return val.get();
            }

            public static void set(String x) {
                ThreadContext.val.set(x);
            }
        }
    }

哪些是输出

Main Thread: MAIN
Parallel Consumer - 5: MAIN
Parallel Consumer - 4: MAIN
Parallel Consumer - 7: empty
Parallel Consumer - 3: empty
Parallel Consumer - 1: empty
Parallel Consumer - 6: empty
Parallel Consumer - 2: empty
Parallel Consumer - 0: MAIN

是否有一种方法可以将 main() 方法中的 ThreadLocal 克隆到为每个并行执行生成的线程中?

这样我的结果就是:

Main Thread: MAIN
Parallel Consumer - 5: MAIN
Parallel Consumer - 4: MAIN
Parallel Consumer - 7: MAIN
Parallel Consumer - 3: MAIN
Parallel Consumer - 1: MAIN
Parallel Consumer - 6: MAIN
Parallel Consumer - 2: MAIN
Parallel Consumer - 0: MAIN

而不是第一个?


3
我不会有这样的期望。但为什么一开始要使用ThreadLocal,而不是在主方法中创建并显式地将其传递给lambda函数呢? - Louis Wasserman
5
你是想交叉使用这些流吗?我听说那样做会很糟糕。 - zapl
1个回答

15

正如Louis在评论中所述,你的示例可以很好地简化为在lambda表达式中捕获本地变量的值。

public static void main(String[] args)  {
    String value = "MAIN";
    System.out.printf("Main Thread: %s\n", value);

    IntStream.range(0,8).boxed().parallel().forEach(n -> {
        System.out.printf("Parallel Consumer - %d: %s\n", n, value);
    });
}

从您的示例中无法明确完整的用例。

如果您确切地知道哪些线程将从您的主线程启动,则可以考虑使用InheritableThreadLocal

此类扩展了ThreadLocal,以提供从父线程到子线程的值继承:当创建子线程时, 对于父线程具有值的所有可继承线程局部变量,子线程接收初始值。

在您的情况下,声明val为一个InheritableThreadLocal,因为在ForkJoinPool#commonPool()中为parallel()创建的Thread实例是延迟创建的,它们都将从main方法(和线程)中设置的set值进行继承。

如果在在原始线程中设置InhertiableThreadLocal值之前以某种方式使用了commonPool(或调用了parallel终端操作的任何池),则情况将不同。


是的,我的例子并没有提供太多关于用例的信息。实际用例是与其他代码一起使用(除了parallel()消费者),ThreadLocal在Servlet过滤器中被填充,然后在Spring MVC控制器中使用,并且有时为各种可运行实现克隆到整个板块(服务调用等)。因此,将会有各种调用沿着路线ThreadContext.get(something)。感谢您解释InheritableThreadLocal,我会尝试一下! - Anatoli Radulov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接