所以我的最佳解决方案是:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest)
{
return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}
public static void main(String args[]) throws Exception
{
int dbOffset = 0;
int nbOfArticlesPerRequest = 100;
int MYTHREADS = 10;
int loopIndex = 0;
boolean bContinue=true;
Runnable worker;
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
}
public static class MyRunnable implements Runnable {
private final String id;
MyRunnable(String id) {
this.id = id;
}
@Override
public void run()
{
System.out.println("Thread '"+id+"' started");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread '"+id+"' stopped");
}
}
}
这个工作很顺利,但缺点是在每个循环结束时我需要等待最后一个线程完成。
例如:当只有3个线程正在运行时...
为了解决这个问题,我做了以下操作,但这样做是否“安全”/正确?
顺便问一下:有没有办法知道队列中有多少任务/线程?
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
编辑:
我尝试启动1个或100万个任务,因此(我认为)不能将它们全部放入队列中...这就是为什么我使用全局执行器,以便始终可以在队列中拥有一些线程(为此,我不能关闭执行器,否则它将无法使用)。
最新代码版本:
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest ); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executorPool.execute(worker);
}
while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2))
{
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
break;
}
TimeUnit.MILLISECONDS.sleep(2000);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executorPool.shutdown();
// Wait until all threads are finish
while (!executorPool.isTerminated()) {
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
感谢您的提前帮助。