如何实现阻塞线程池执行器?

7
我们有一个大型文本文件,其中每行都需要进行密集的处理。设计方案是拥有一个类来读取文件,并将每一行的处理委派给线程池中的线程。当没有空闲线程来处理时,文件读取器类应该被阻塞,不能读取下一行。因此,我需要一个“阻塞式线程池”。
在当前实现中,ThreadPoolExecutor.submit()和ThreadPoolExecutor.execute()方法在配置的线程数达到上限后会抛出RejectedExecutionException异常,如下面的代码片段所示。
public class BlockingTp {

    public static void main(String[] args) {
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        ThreadPoolExecutor executorService=
            new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
        int Jobs = 10;
        System.out.println("Starting application with " + Jobs + " jobs");
        for (int i = 1; i <= Jobs; i++)
            try {
                executorService.submit(new WorkerThread(i));
                System.out.println("job added " + (i));
            } catch (RejectedExecutionException e) {
                System.err.println("RejectedExecutionException");
            }
    }
}

class WorkerThread implements Runnable {
    int job;
    public WorkerThread(int job) {
        this.job = job;
    }
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (Exception excep) {
        }
    }
}

以上程序的输出结果是:
Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException

有人能够解释一下如何实现阻塞线程池吗?


我一开始误读了你的问题,并提供了一个虚假的答案,希望这个修订版对你有用。 - GhostCat
1
@GhostCat- 已完成,谢谢。 - T-Bag
5个回答

13

有人可以帮忙解答一下,如何实现阻塞线程池。

你需要在执行器服务上设置拒绝执行处理程序。当线程试图将作业放入执行器时,它会被阻塞,直到阻塞队列中有空间。

BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
     new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // block until there's room
            executor.getQueue().put(r);
            // check afterwards and throw if pool shutdown
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + executor);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RejectedExecutionException("Producer interrupted", e);
        }
    }
});

因此,TRE不会抛出RejectedExecutionException,而是调用拒绝处理程序,该处理程序将尝试将作业放回队列。这会阻止调用者。


3
直接将任务插入工作队列不安全,因为它绕过了ThreadPoolExecutor用于管理其工作线程池状态的逻辑。例如,它不会正确检查线程池是否已关闭,这意味着对execute()方法的调用可能会挂起而不是正确地拒绝任务。 - David
1
这是一个关于@yaseco的常见问题解答。每当你编写捕获InterruptedException的代码时,应立即重新中断线程。抛出异常会清除线程的中断标志,通常你需要确保调用者知道线程已被中断。参见:https://dev59.com/jW865IYBdhLWcg3wHK3u#3976377 和 https://dzone.com/articles/how-to-handle-the-interruptedexception。 - Gray
添加了一个检查,以判断执行器是否已经关闭。 - Gray
@Gray 中断机制的作者并不愚蠢——标志位之所以被置为0,是因为它应该是这样的。他们没有设计一个需要编写相当笨拙代码行或者你的代码就会出错的API。中断意味着任何你想要的意思——除非你编程让它发生。例如,它们可以表示:如果线程在等待空间期间被中断,则拒绝它。而你正在这样做。因此,重新引发标志位将是不好的。 - rzwitserloot
抱歉 @rzwitserloot,我说他们愚蠢了吗?我知道中断是如何工作的。不确定你的其他评论。你是在说我的答案难以操作吗?你会怎么做呢?顺便看看我的多线程得分,以防有任何问题。 - Gray
显示剩余3条评论

0

让我们再来看看你的代码:

for (int i = 1; i <= Jobs; i++)
  try {
    tpExe.submit(new WorkerThread(i));
    System.out.println("job added " + (i));
  } catch (RejectedExecutionException e) {
    System.err.println("RejectedExecutionException");
  }

所以 - 当您尝试提交时,如果池忙碌,就会抛出该异常。如果您想要包装它,可以这样写:

public void yourSubmit(Runnable whatever) {
  boolean submitted = false;
  while (! submitted ) {
    try {
      tpExe.submit(new WorkerThread(whatever));
      submitted = true;
    } catch (RejectedExecutionException re) {
      // all threads busy ... so wait some time
      Thread.sleep(1000);
    }

换句话说:使用该异常作为“标记”,表示当前无法提交。

0

你可以使用信号量来控制资源。读取器将通过获取信号量来读取并创建异步任务。如果每个线程都很忙,读取器线程将等待直到线程可用。

public class MyExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

0

这里有一个支持所需行为的RejectedExecutionHandler。与其他实现不同,它不直接与队列交互,因此应该与所有Executor实现兼容,并且不会死锁。

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;

import static com.github.cowwoc.requirements.DefaultRequirements.assertThat;
import static com.github.cowwoc.requirements.DefaultRequirements.requireThat;

/**
 * Applies a different rejection policy depending on the thread that requested execution.
 */
public final class ThreadDependantRejectionHandler implements RejectedExecutionHandler
{
    private final ThreadLocal<Integer> numberOfRejections = ThreadLocal.withInitial(() -> 0);
    private final BiFunction<Thread, Executor, Action> threadToAction;

    /**
     * @param threadToAction indicates what action a thread should take when execution is rejected
     * @throws NullPointerException if {@code threadToAction} is null
     */
    public ThreadDependantRejectionHandler(BiFunction<Thread, Executor, Action> threadToAction)
    {
        requireThat(threadToAction, "threadToAction").isNotNull();
        this.threadToAction = threadToAction;
    }

    @SuppressWarnings("BusyWait")
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
    {
        if (executor.isShutdown())
            return;
        Thread currentThread = Thread.currentThread();
        Action action = threadToAction.apply(currentThread, executor);
        if (action == Action.RUN)
        {
            r.run();
            return;
        }
        if (action == Action.REJECT)
        {
            throw new RejectedExecutionException("The thread pool queue is full and the current thread is not " +
                "allowed to block or run the task");
        }

        assertThat(action, "action").isEqualTo(Action.BLOCK);
        int numberOfRejections = this.numberOfRejections.get();
        ++numberOfRejections;
        this.numberOfRejections.set(numberOfRejections);
        if (numberOfRejections > 1)
            return;
        try
        {
            ThreadLocalRandom random = ThreadLocalRandom.current();
            while (!executor.isShutdown())
            {
                try
                {
                    Thread.sleep(random.nextInt(10, 1001));
                }
                catch (InterruptedException e)
                {
                    throw new WrappingException(e);
                }
                executor.submit(r);
                numberOfRejections = this.numberOfRejections.get();
                if (numberOfRejections == 1)
                {
                    // Task was accepted, or executor has shut down
                    return;
                }
                // Task was rejected, reset the counter and try again.
                numberOfRejections = 1;
                this.numberOfRejections.set(numberOfRejections);
            }
            throw new RejectedExecutionException("Task " + r + " rejected from " + executor + " because " +
                "the executor has been shut down");
        }
        finally
        {
            this.numberOfRejections.set(0);
        }
    }

    public enum Action
    {
        /**
         * The thread should run the task directly instead of waiting for the executor.
         */
        RUN,
        /**
         * The thread should block until the executor is ready to run the task.
         */
        BLOCK,
        /**
         * The thread should reject execution of the task.
         */
        REJECT
    }
}

-1

这对我有用。

class handler implements RejectedExecutionHandler{
    @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }

不要像这样回答问题,而是将其放入你的问题中。 - GhostCat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接