Java线程池的使用

7

我正在尝试编写一个多线程的网络爬虫。

我的主入口类具有以下代码:

ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null)
         return;
exec.execute(new URLCrawler(this, url));
}

URLCrawler会获取指定的URL,解析HTML并从中提取链接,然后将未见过的链接排入前沿队列。前沿队列是一组未被爬取的URL。问题在于如何编写get()方法。如果队列为空,则应等待任何URLCrawlers完成后再尝试。只有在队列为空且没有当前活动的URLCrawler时,它才应返回null。
我的第一个想法是使用AtomicInteger来计算当前工作的URLCrawlers数量,并使用辅助对象进行notifyAll()/wait()调用。每个爬虫在启动时都会增加当前工作的URLCrawlers数量,在退出时会减少它,并通知已完成的对象。
但我读到notify()/notifyAll()和wait()是一些已过时的线程通信方法。
在这种工作模式下,我应该使用什么?这类似于M个生产者和N个消费者,问题是如何处理生产者的枯竭。
6个回答

3

我不确定我理解你的设计,但这可能需要使用 Semaphore


3

一种选择是将“frontier”作为阻塞队列,因此任何试图从中“获取”的线程都将被阻塞。只要任何其他URLCrawler将对象放入该队列中,任何其他线程都会自动收到通知(已出队的对象)。


是的,那是一个稳态的解决方案。但是当没有URLCrawlers队列任何URL时,该怎么处理呢?使用阻塞队列,前沿将无限制地阻塞。 - Anton Kazennikov
在这种情况下,您可以在前沿对象上拥有一个crawlerDone()方法,每当一个UrlCrawler完成工作时就会调用它。除了您建议的计数器方法之外,通过这种方法,您可以测试(在前沿方法中)是否所有爬虫都已完成。如果是真的,则get()可以返回null而不会阻塞。 - naikus
前沿可以是一个固定容量的阻塞队列。该容量的一个好选择是numberOfCrawlers。 - Ovidiu Lupas
@Loop: 如果爬虫程序排队超过一个URL(这似乎很可能),就会出现死锁。如果您使用无限制的阻塞队列,那么当您检测到活动已结束时,您必须使用“特殊消息”来从队列中强制阻塞线程退出(然后您仍然需要解决这个问题)。总之,我认为阻塞队列在这里不会有所帮助... - Enno Shioji

2
问题有点老,但我认为我已经找到了一些简单有效的解决方案:
请按照以下方式扩展ThreadPoolExecutor类。新功能是保持活动任务计数(不幸的是,提供的getActiveCount()不可靠)。如果taskCount.get() == 0并且没有更多排队的任务,则意味着没有任何事情要做,执行程序关闭。您有退出标准。此外,如果您创建了执行程序,但未提交任何任务,则不会阻止:
public class CrawlingThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger taskCount = new AtomicInteger();

    public CrawlingThreadPoolExecutor() {
        super(8, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {

        super.beforeExecute(t, r);
        taskCount.incrementAndGet();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        super.afterExecute(r, t);
        taskCount.decrementAndGet();
        if (getQueue().isEmpty() && taskCount.get() == 0) {
            shutdown();
        }
    }
}

还有一件事情需要做,那就是以一种方式实现你的Runnable,使其保留对使用的Executor的引用,以便能够提交新任务。这里是一个模拟示例:

public class MockFetcher implements Runnable {

    private final String url;
    private final Executor e;

    public MockFetcher(final Executor e, final String url) {
        this.e = e;
        this.url = url;
    }

    @Override
    public void run() {
        final List<String> newUrls = new ArrayList<>();
        // Parse doc and build url list, and then:
        for (final String newUrl : newUrls) {
            e.execute(new MockFetcher(this.e, newUrl));
        }
    }
}

2

我认为在这种情况下使用wait/notify是合理的。无法想到任何使用j.u.c直接完成此操作的简单方法。
在一个名为Coordinator的类中:

private final int numOfCrawlers;
private int waiting;

public boolean shouldTryAgain(){
    synchronized(this){
        waiting++;
        if(waiting>=numOfCrawlers){
            //Everybody is waiting, terminate
            return false;
        }else{
            wait();//spurious wake up is okay
            //waked up for whatever reason. Try again
            waiting--;
            return true;
        }
    }

public void hasEnqueued(){
    synchronized(this){
        notifyAll();
    }
} 

那么,接下来,
ExecutorService exec = Executors.newFixedThreadPool(numberOfCrawlers);
while(true){
    URL url = frontier.get();
    if(url == null){
        if(!coordinator.shouldTryAgain()){
            //all threads are waiting. No possibility of new jobs.
            return;
        }else{
            //Possible that there are other jobs. Try again
            continue;
        }
    }
    exec.execute(new URLCrawler(this, url));
}//while(true)

2

我认为对于您的用例来说,一个基本的构建块是“锁存器”(latch),类似于CountDownLatch,但与CountDownLatch不同的是,它允许增加计数。

这样一个锁存器的接口可能如下:

public interface Latch {
    public void countDown();
    public void countUp();
    public void await() throws InterruptedException;
    public int getCount();
}

计数器的合法值为0及以上。await()方法可以让您阻塞直到计数器下降至零。

如果您有这样一个门闩,您的用例可以相对容易地描述出来。我还怀疑队列(frontier)可以在这个解决方案中被消除(执行器已经提供了一个,因此它有些冗余)。我会将您的主要例程重写为:

ExecutorService executor = Executors.newFixedThreadPool(numberOfCrawlers);
Latch latch = ...; // instantiate a latch
URL[] initialUrls = ...;
for (URL url: initialUrls) {
    executor.execute(new URLCrawler(this, url, latch));
}
// now wait for all crawling tasks to finish
latch.await();

您的URLCrawler将如下使用锁定机制:

您的URLCrawler将以以下方式使用latch:

public class URLCrawler implements Runnable {
    private final Latch latch;

    public URLCrawler(..., Latch l) {
        ...
        latch = l;
        latch.countUp(); // increment the count as early as possible
    }

    public void run() {
        try {
            List<URL> secondaryUrls = crawl();
            for (URL url: secondaryUrls) {
                // submit new tasks directly
                executor.execute(new URLCrawler(..., latch));
            }
        } finally {
            // as a last step, decrement the count
            latch.countDown();
        }
    }
}

关于锁的实现,可以有多种可能的实现方式,从基于wait()和notifyAll()的实现,到使用Lock和Condition的实现,再到使用AbstractQueuedSynchronizer的实现。我认为所有这些实现都相当简单。需要注意的是,基于wait()-notifyAll()的版本和基于Lock-Condition的版本将基于互斥,而AQS版本将利用CAS(比较并交换),因此在某些情况下可能会更好地扩展。


你的自定义锁看起来很像信号量... 为什么不使用信号量呢? - assylias
是的,肯定有相似之处。从普通信号量中缺少的一件事是await()方法,在信号量术语中可以阻塞直到所有许可证都被释放。可能可以通过组合信号量和倒计时门闩来创建它。 - sjlee

0

我想建议一个AdaptiveExecuter。根据特征值,您可以选择序列化或并行化线程以执行。在下面的示例中,PUID是我想使用的字符串/对象,以做出决策。您可以更改逻辑以适应您的代码。某些代码部分已被注释以允许进一步实验。

class AdaptiveExecutor实现了Executor { final Queue tasks = new LinkedBlockingQueue(); Runnable active ; //ExecutorService threadExecutor=Executors.newCachedThreadPool(); static ExecutorService threadExecutor=Executors.newFixedThreadPool(4);

AdaptiveExecutor() {
    System.out.println("Initial Queue Size=" + tasks.size());
}

public void execute(final Runnable r) {
    /* if immediate start is needed do either of below two
    new Thread(r).start();

    try {
        threadExecutor.execute(r);
    } catch(RejectedExecutionException rEE ) {
        System.out.println("Thread Rejected " + new Thread(r).getName());
    }

    */


    tasks.offer(r); // otherwise, queue them up
    scheduleNext(new Thread(r)); // and kick next thread either serial or parallel.
    /*
    tasks.offer(new Runnable() {
        public void run() {
            try {
                r.run();
            } finally {
                scheduleNext();
            }
        }
    });
    */
    if ((active == null)&& !tasks.isEmpty()) {
        active = tasks.poll();
        try {
            threadExecutor.submit(active);
        } catch (RejectedExecutionException rEE) {
            System.out.println("Thread Rejected " + new Thread(r).getName());
        }
    }

    /*
    if ((active == null)&& !tasks.isEmpty()) {
        scheduleNext();
    } else tasks.offer(r);
    */
    //tasks.offer(r);

    //System.out.println("Queue Size=" + tasks.size());

}

private void serialize(Thread th) {
    try {
        Thread activeThread = new Thread(active);

        th.wait(200);
        threadExecutor.submit(th);
    } catch (InterruptedException iEx) {

    }
    /*
    active=tasks.poll();
    System.out.println("active thread is " +  active.toString() );
    threadExecutor.execute(active);
    */
}

private void parallalize() {
    if(null!=active)
        threadExecutor.submit(active);
}

protected void scheduleNext(Thread r) {
    //System.out.println("scheduleNext called") ;
    if(false==compareKeys(r,new Thread(active)))
        parallalize();
    else serialize(r);
}

private boolean compareKeys(Thread r, Thread active) {
    // TODO: obtain names of threads. If they contain same PUID, serialize them.
    if(null==active)
        return true; // first thread should be serialized
    else return false;  //rest all go parallel, unless logic controlls it
}

}


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接