如何使用固定数量的工作线程实现简单的线程化?

50

我正在寻找最简单、最直接的方法来实现以下内容:

  • 主程序实例化工作线程来执行一个任务。
  • 只能同时运行n个任务。
  • 当达到n时,不会再启动更多的工作线程,直到正在运行的线程数量降至n以下。
7个回答

56

我认为Executors.newFixedThreadPool符合你的需求。根据你是否需要将结果返回给主线程,任务是否完全自包含以及是否有一组任务可以在前面执行,或者是否响应某些事件将任务排队,可使用多种不同的方法来使用生成的ExecutorService。

  Collection<YourTask> tasks = new ArrayList<YourTask>();
  YourTask yt1 = new YourTask();
  ...
  tasks.add(yt1);
  ...
  ExecutorService exec = Executors.newFixedThreadPool(5);
  List<Future<YourResultType>> results = exec.invokeAll(tasks);

或者,如果您需要响应某个事件执行新的异步任务,则可能只需使用ExecutorService的简单的execute(Runnable)方法。


25
/* Get an executor service that will run a maximum of 5 threads at a time: */
ExecutorService exec = Executors.newFixedThreadPool(5);
/* For all the 100 tasks to be done altogether... */
for (int i = 0; i < 100; i++) {
    /* ...execute the task to run concurrently as a runnable: */
    exec.execute(new Runnable() {
        public void run() {
            /* do the work to be done in its own thread */
            System.out.println("Running in: " + Thread.currentThread());
        }
    });
}
/* Tell the executor that after these 100 steps above, we will be done: */
exec.shutdown();
try {
    /* The tasks are now running concurrently. We wait until all work is done, 
     * with a timeout of 50 seconds: */
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS);
    /* If the execution timed out, false is returned: */
    System.out.println("All done: " + b);
} catch (InterruptedException e) { e.printStackTrace(); }

7

Executors.newFixedThreadPool(int)

Executor executor = Executors.newFixedThreadPool(n);

Runnable runnable = new Runnable() {
 public void run() {
  // do your thing here
 }
}

executor.execute(runnable);

2
  1. If your task queue is not going to be unbounded and tasks can complete in shorter time intervals, you can use Executors.newFixedThreadPool(n); as suggests by experts.

    The only drawback in this solution is unbounded task queue size. You don't have control over it. The huge pile-up in task queue will degrade performance of application and may cause out of memory in some scenarios.

  2. If you want to use ExecutorService and enable work stealing mechanism where idle worker threads share the work load from busy worker threads by stealing tasks in task queue. It will return ForkJoinPool type of Executor Service.

    public static ExecutorService newWorkStealingPool(int parallelism)

    Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

  3. I prefer ThreadPoolExecutor due to flexibility in APIs to control many paratmeters, which controls the flow task execution.

    ThreadPoolExecutor(int corePoolSize, 
                           int maximumPoolSize, 
                           long keepAliveTime, 
                           TimeUnit unit, 
                           BlockingQueue<Runnable> workQueue, 
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler)
    
在您的情况下,将corePoolSize和maximumPoolSize都设置为N。这里您可以控制任务队列大小,定义自己的线程工厂和拒绝处理程序策略。
查看相关SE问题以动态控制池大小: Dynamic Thread Pool

2

0

如果你想自己编写:

private static final int MAX_WORKERS = n;
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS);

private boolean roomLeft() {
    synchronized (workers) {
        return (workers.size() < MAX_WORKERS);
    }
}

private void addWorker() {
    synchronized (workers) {
        workers.add(new Worker(this));
    }
}

public void removeWorker(Worker worker) {
    synchronized (workers) {
        workers.remove(worker);
    }
}

public Example() {
    while (true) {
        if (roomLeft()) {
            addWorker();
        } 
    }
}

Worker是您扩展Thread的类。每个工作线程在完成任务后,将调用此类的removeWorker方法,并将自身作为参数传入。

话虽如此,Executor框架看起来更好。

编辑:有人可以解释一下为什么这样不好吗?不要只是给它负面评价。


0

正如其他人在这里提到的,你最好使用Executors类创建一个线程池:

然而,如果你想自己编写代码,以下代码应该可以给你一个思路。基本上,只需将每个新线程添加到线程组中,并确保组中从未有超过N个活动线程:

Task[] tasks = getTasks(); // array of tasks to complete
ThreadGroup group = new ThreadGroup();
int i=0;
while( i<tasks.length || group.activeCount()>0 ) {
    if( group.activeCount()<N && i<tasks.length ) {
        new TaskThread(group, tasks[i]).start();
        i++;
    } else {
        Thread.sleep(100);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接