线程池中的线程休眠

9
假设我们有一个线程池,其中包含有限数量的线程。
Executor executor = Executors.newFixedThreadPool(3);

现在假设其中一个活动任务必须休眠3秒钟(出于任何原因)。
executor.execute(() -> {
    try {
        Thread.sleep(3000L);
    } catch (InterruptedException ignore) {}
});

我们如何实现这样一个线程池,以便在任务睡眠(或等待监视器/条件时),可以有效地使用线程1运行另一个任务? 1 我所说的线程并不是指“物理”Java线程,因为当线程处于睡眠状态时这是不可能的。我的意思是,线程池应该有一个抽象实现,虚拟上似乎允许一个线程在睡眠期间运行另一个任务。关键点在于始终有N个同时运行(非睡眠)的任务。
类似于监视器处理对关键区域的访问的方式:
  • 如果一个线程在等待资源,该资源可以被另一个线程使用。
  • 如果线程被通知,它将被放置到等待集合中以重新获得对该资源的访问权限。

1
让它做一些工作,而不是睡觉,我猜。 - Pavel Smirnov
如果你有一个需要锁的工作,线程可以尝试获取锁并仅在成功时执行该工作。否则,它可以切换到等待队列中的另一个工作并尝试执行,或者返回原始线程的任务。我想要表达的观点是,最好做一些工作,如果可能的话,而不是只是睡觉。 - Pavel Smirnov
这让我想起了Lua中的协程 - 去查一下吧。它可能会给你带来一些灵感。 - Felix
有趣的问题。我认为你不会得到一个令人满意的答案,因为你基本上是在操作系统之上实现线程调度器。你可以使用锁和P>K后台线程,配合自定义的“睡眠”方法。 - ttzn
@PavelSmirnov 看起来把这么多责任放在线程池 API 用户身上,让他们确保在无法获得锁时运行另一个任务,并在每个任务中重复该代码,似乎有些过分。否则,您的观点是正确的:不要编写睡眠任务。 - Snackoverflow
显示剩余3条评论
2个回答

2
你所询问的实际上是在JVM/操作系统线程之上实现协程/纤程。Sanhong Li曾经做过一次很好的演讲,介绍了阿里巴巴工程师实现这种构造的方法——其思路是不依赖于操作系统线程调度器,而是需要依赖于自己的选择器。
此外,还可以参考Loom项目中的纤程(用户空间绿色线程)。

0

我实现了一个最小化的工作示例,基本上做了我认为你想要的事情。

一个任务接口(类似于可运行接口,只是通过传递上下文来执行等待)

package io.medev.stackoverflow;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

public interface Task {

    /**
     * Wraps the given runnable into a Task with a not guessable execution time (meaning guessExecutionTime always returns Long.MAX_VALUE)
     * @param runnable The runnable to wrap
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable) {
        return wrap(runnable, Long.MAX_VALUE);
    }

    /**
     * Wraps the given runnable using the given guessedExecutionTimeMillis
     * @param runnable The runnable to wrap
     * @param guessedExecutionTimeMillis The guessed execution time in millis for this runnable
     * @return a Task wrapping this runnable
     */
    static Task wrap(Runnable runnable, long guessedExecutionTimeMillis) {
        return new Task() {
            @Override
            public long guessExecutionTimeMillis() {
                return guessedExecutionTimeMillis;
            }

            @Override
            public void run(Context context) {
                runnable.run();
            }
        };
    }

    /**
     * Should more or less guess how long this task will run
     * @return The execution time of this Task in milliseconds
     */
    long guessExecutionTimeMillis();

    void run(Context context);

    interface Context {

        /**
         * Block until the condition is met, giving other Tasks time to execute
         * @param condition the condition to check
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition) throws InterruptedException;

        /**
         * Blocks at least for the given duration, giving other Tasks time to execute
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(long timeout, TimeUnit timeUnit) throws InterruptedException;

        /**
         * Blocks until the condition is met or the timeout expires, giving other Tasks time to execute
         * @param condition the condition to check
         * @param timeout
         * @param timeUnit
         * @throws InterruptedException if the current thread is interrupted
         */
        void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException;
    }
}

还有一个基本的固定线程池执行器 - 但你必须依赖于具体的实现:

package io.medev.stackoverflow;

import java.util.Comparator;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class TimeEfficientExecutor implements Executor {

    private final BlockingQueue<Task> taskQueue;
    private final CountDownLatch latch;
    private volatile boolean alive;

    public TimeEfficientExecutor(int threads) {
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingLong(Task::guessExecutionTimeMillis));
        this.latch = new CountDownLatch(threads);
        this.alive = true;

        for (int i = 0; i < threads; i++) {
            Thread thread = new Thread(new TimeEfficientExecutorRunnable());
            thread.start();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        execute(Task.wrap(runnable));
    }

    public void execute(Runnable runnable, long guessedExecutionTimeMillis) {
        execute(Task.wrap(runnable, guessedExecutionTimeMillis));
    }

    public void execute(Task task) {
        this.taskQueue.offer(task);
    }

    public void shutdown() {
        this.alive = false;
    }

    public void awaitShutdown() throws InterruptedException {
        this.latch.await();
    }

    public void awaitShutdown(long timeout, TimeUnit timeUnit) throws InterruptedException {
        this.latch.await(timeout, timeUnit);
    }

    private class TimeEfficientExecutorRunnable implements Runnable {

        @Override
        public void run() {
            try {
                while (TimeEfficientExecutor.this.alive) {
                    Task task = TimeEfficientExecutor.this.taskQueue.poll();

                    if (task != null) {
                        try {
                            task.run(new IdleTaskContext());
                        } catch (Exception e) {
                            // TODO: logging
                        }
                    }
                }
            } finally {
                TimeEfficientExecutor.this.latch.countDown();
            }
        }
    }

    private class IdleTaskContext implements Task.Context {

        @Override
        public void idle(BooleanSupplier condition) throws InterruptedException {
            idle(condition, Long.MAX_VALUE);
        }

        @Override
        public void idle(long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(() -> false, timeout, timeUnit);
        }

        @Override
        public void idle(BooleanSupplier condition, long timeout, TimeUnit timeUnit) throws InterruptedException {
            idle(condition, System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }

        private void idle(BooleanSupplier condition, long idleUntilTs) throws InterruptedException {
            long leftMillis = idleUntilTs - System.currentTimeMillis();

            while (TimeEfficientExecutor.this.alive && !condition.getAsBoolean() && leftMillis >= 1L) {
                Task task = TimeEfficientExecutor.this.taskQueue.poll(leftMillis, TimeUnit.MILLISECONDS);
                leftMillis = idleUntilTs - System.currentTimeMillis();

                if (task != null) {
                    if (leftMillis >= 1L && task.guessExecutionTimeMillis() < leftMillis) {
                        task.run(new IdleTaskContext());
                    } else {
                        TimeEfficientExecutor.this.taskQueue.offer(task);
                    }
                }
            }
        }
    }
}

请注意,您不能仅仅通过步进堆栈来控制程序的执行流程 - 堆栈是绑定在执行线程上的。这意味着,如果某个“子”任务开始空闲,就不可能跳回到底层的空闲任务中。您必须“信任”每个任务在guessExecutionTimeMillis方法中返回的结果。
由于执行器中使用了优先队列,因此队列总是返回执行时间最短的任务。感谢优先队列的使用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接