从ExecutorService获取的新线程中传播ThreadLocal

40

我正在使用ExecutorService和Future在单独的线程中运行一个带有超时的进程(示例代码here)("生成"线程发生在AOP Aspect中)。

现在,主线程是一个Resteasy请求。 Resteasy使用一个或多个ThreadLocal变量来存储一些上下文信息,我需要在Rest方法调用的某个点检索这些信息。问题是,由于Resteasy线程在新线程中运行,ThreadLocal变量会丢失。

最好的方法是将Resteasy使用的任何ThreadLocal变量“传播”到新线程中?似乎Resteasy使用不止一个ThreadLocal变量来跟踪上下文信息,我希望“盲目”地将所有信息传输到新线程。

我尝试过子类化ThreadPoolExecutor 并使用beforeExecute方法将当前线程传递给池,但我找不到将ThreadLocal变量传递给池的方法。

有什么建议吗?

谢谢


你能稍微改写一下第二段吗?它让我很困惑。此外,beforeExecute有什么问题?你不能使其正常工作,还是你意识到它无法满足你的需求? - toto2
6个回答

24
ThreadLocal实例集合与线程相关联的信息存储在每个Thread的私有成员中。你唯一枚举这些内容的机会就是对Thread进行反射,通过这种方式,你可以覆盖对线程字段的访问限制。
一旦你能够获取到ThreadLocal的集合,你可以使用ThreadPoolExecutorbeforeExecute()afterExecute()钩子,在后台线程中复制它们,或者通过为任务创建一个Runnable包装器来截取run()调用以设置并取消必要的ThreadLocal实例。事实上,后一种技术可能更好,因为它可以为你提供一个方便的地方来存储任务排队时的ThreadLocal值。
更新:以下是第二种方法的更具体说明。与我最初的描述相反,包装器中仅存储调用线程,在执行任务时对其进行查询。
static Runnable wrap(Runnable task)
{
  Thread caller = Thread.currentThread();
  return () -> {
    Iterable<ThreadLocal<?>> vars = copy(caller);
    try {
      task.run();
    }
    finally {
      for (ThreadLocal<?> var : vars)
        var.remove();
    }
  };
}

/**
 * For each {@code ThreadLocal} in the specified thread, copy the thread's 
 * value to the current thread.  
 * 
 * @param caller the calling thread
 * @return all of the {@code ThreadLocal} instances that are set on current thread
 */
private static Collection<ThreadLocal<?>> copy(Thread caller)
{
  /* Use a nasty bunch of reflection to do this. */
  throw new UnsupportedOperationException();
}

你能举个例子说明后一种技术是如何工作的吗? - Viraj
@MarcelStör 我没有尝试过,但它看起来是正确的信息。因此,除了读取value字段外,您还需要调用条目上的get()方法以获取条目引用的ThreadLocal实例。一旦您拥有这两个对象,就可以完成反射操作。对于找到的每个条目,请调用local.set(value),并将线程本地添加到从copy()返回的集合中。 - erickson
1
@MarcelStör 当我更仔细地看这个问题时,实际上可能存在线程本地映射的可见性问题,因为它是为单线程访问而设计的。这意味着您需要复制当前线程中的值,然后在工作线程中设置它们,并以安全的方式在线程之间传递它们。我将不得不再考虑一下并修改我的答案。 - erickson
@MarcelStör 经过更多的思考,我认为在这种特殊情况下,一切都没问题。将任务提交给“Executor”将处理可见性问题,因为那里有一个内存屏障。而且,由于主线程正在等待后台线程运行,因此主线程的“ThreadLocals”在此期间不会被修改。但是,如果这些条件发生变化,可能需要进行一些额外的工作。 - erickson
1
@erickson,拷贝过晚的一个问题是,在典型平台上,ThreadLocal将会在该线程处理下一个请求时发生更改,因此我认为您必须在出现竞争条件之前在该线程上进行拷贝。 - Dean Hiller
显示剩余3条评论

5

根据 @erickson 的回答,我编写了这段代码。它适用于可继承线程本地变量。使用的方法与 Thread 构造函数中使用的方法相同,构建可继承线程本地变量列表。当然,我使用反射来完成此操作。同时,我重写了执行程序类。

public class MyThreadPoolExecutor extends ThreadPoolExecutor
{
   @Override
   public void execute(Runnable command)
   {
      super.execute(new Wrapped(command, Thread.currentThread()));
   }
}

封装器:

   private class Wrapped implements Runnable
   {
      private final Runnable task;

      private final Thread caller;

      public Wrapped(Runnable task, Thread caller)
      {
         this.task = task;
         this.caller = caller;
      }

      public void run()
      {
         Iterable<ThreadLocal<?>> vars = null;
         try
         {
            vars = copy(caller);
         }
         catch (Exception e)
         {
            throw new RuntimeException("error when coping Threads", e);
         }
         try {
            task.run();
         }
         finally {
            for (ThreadLocal<?> var : vars)
               var.remove();
         }
      }
   }

复制方法:

public static Iterable<ThreadLocal<?>> copy(Thread caller) throws Exception
   {
      List<ThreadLocal<?>> threadLocals = new ArrayList<>();
      Field field = Thread.class.getDeclaredField("inheritableThreadLocals");
      field.setAccessible(true);
      Object map = field.get(caller);
      Field table = Class.forName("java.lang.ThreadLocal$ThreadLocalMap").getDeclaredField("table");
      table.setAccessible(true);

      Method method = ThreadLocal.class
              .getDeclaredMethod("createInheritedMap", Class.forName("java.lang.ThreadLocal$ThreadLocalMap"));
      method.setAccessible(true);
      Object o = method.invoke(null, map);

      Field field2 = Thread.class.getDeclaredField("inheritableThreadLocals");
      field2.setAccessible(true);
      field2.set(Thread.currentThread(), o);

      Object tbl = table.get(o);
      int length = Array.getLength(tbl);
      for (int i = 0; i < length; i++)
      {
         Object entry = Array.get(tbl, i);
         Object value = null;
         if (entry != null)
         {
            Method referentField = Class.forName("java.lang.ThreadLocal$ThreadLocalMap$Entry").getMethod(
                    "get");
            referentField.setAccessible(true);
            value = referentField.invoke(entry);
            threadLocals.add((ThreadLocal<?>) value);
         }
      }
      return threadLocals;
   }

1
我不喜欢反射方法。替代方案是实现执行器包装器,并将对象直接作为ThreadLocal上下文传递给所有子线程,传播父级上下文。
public class PropagatedObject {

    private ThreadLocal<ConcurrentHashMap<AbsorbedObjectType, Object>> data = new ThreadLocal<>();

   //put, set, merge methods, etc

}

==>

public class ObjectAwareExecutor extends AbstractExecutorService {

    private final ExecutorService delegate;
    private final PropagatedObject objectAbsorber;

    public ObjectAwareExecutor(ExecutorService delegate, PropagatedObject objectAbsorber){
        this.delegate = delegate;
        this.objectAbsorber = objectAbsorber;
    }
    @Override
    public void execute(final Runnable command) {

        final ConcurrentHashMap<String, Object> parentContext = objectAbsorber.get();
        delegate.execute(() -> {
            try{
                objectAbsorber.set(parentContext);
                command.run();
            }finally {
                parentContext.putAll(objectAbsorber.get());
                objectAbsorber.clean();
            }
        });
        objectAbsorber.merge(parentContext);
    }

1
根据我的理解,您可以看一下InheritableThreadLocal。它是用来将ThreadLocal变量从父线程上下文传递到子线程上下文的。

27
首先,这个方案行不通,因为原帖作者无法控制第三方库中ThreadLocal的创建。其次,ExecutorService 可以复用线程,而 InheritableThreadLocal 仅在您直接生成新线程时才起作用。 - Tomasz Nurkiewicz

0

这里有一个示例,可以将父线程中的当前LocaleContext传递给由CompletableFuture[默认情况下使用ForkJoinPool]创建的子线程。

只需在Runnable块内定义您想在子线程中执行的所有操作。因此,当CompletableFuture执行Runnable块时,控制权在子线程手中,您就可以在子线程的ThreadLocal中设置父线程的ThreadLocal内容。

问题在于并非整个ThreadLocal都被复制。只有LocaleContext被复制。由于ThreadLocal仅对其所属的线程具有私有访问权限,因此使用Reflection尝试在子线程中获取和设置可能会导致内存泄漏或性能损失。

因此,如果您知道您感兴趣的ThreadLocal参数,则此解决方案的效果更佳。

 public void parentClassMethod(Request request) {
        LocaleContext currentLocale = LocaleContextHolder.getLocaleContext();
        executeInChildThread(() -> {
                LocaleContextHolder.setLocaleContext(currentLocale);
                //Do whatever else you wanna do
            }));

        //Continue stuff you want to do with parent thread
}


private void executeInChildThread(Runnable runnable) {
    try {
        CompletableFuture.runAsync(runnable)
            .get();
    } catch (Exception e) {
        LOGGER.error("something is wrong");
    }
}

-4
如果您查看ThreadLocal代码,您会看到:
    public T get() {
        Thread t = Thread.currentThread();
        ...
    }

当前线程无法被覆盖。

可能的解决方案:

  1. 查看Java 7的fork/join机制(但我认为这是一种不好的方式)

  2. 查看endorsed机制,以覆盖JVM中的ThreadLocal类。

  3. 尝试重写RESTEasy(您可以使用IDE中的重构工具替换所有ThreadLocal用法,这看起来很容易)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接