并发检查集合是否为空

4

我有这段代码:

private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
@Override
public void run(){
  while(!intervals.isEmpty()){
    //remove one interval
    //do calculations
    //add some intervals
  }
}

此代码同时由特定数量的线程执行。正如您所看到的,循环应该继续直到集合中没有更多的间隔为止,但是有一个问题。在每次迭代的开始处,会从集合中删除一个间隔,并且在结束时可能会将一些间隔重新添加回同一集合中。
问题是,当一个线程在循环内部时,集合可能变为空,因此试图进入循环的其他线程将无法这样做并会过早完成其工作,即使第一个线程完成迭代后集合可能被填充。 我希望线程计数保持不变(或不超过某个数字n)直到所有工作真正完成。
这意味着当前没有线程在循环中工作,并且集合中没有剩余元素。有哪些可能的方法可以实现这一点?欢迎任何想法。
解决此问题的一种方法是在我的特定情况下为每个线程提供原始集合的不同部分。但是,当一个线程完成其工作后,程序将不再使用它,即使它可以帮助其他线程进行计算,因此我不喜欢这种解决方案,因为在我的问题中利用机器的所有内核非常重要。
这是我能想出的最简单的最小工作示例。可能太冗长了。
public class Test{   
   private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue();
   private int threadNumber;
   private Thread[] threads;
   private double result;

   public Test(int threadNumber){
      intervals.add(new Interval(0, 1));
      this.threadNumber = threadNumber;
      threads = new Thread[threadNumber];
   }

   public double find(){
      for(int i = 0; i < threadNumber; i++){
         threads[i] = new Thread(new Finder());
         threads[i].start();
      }
      try{
         for(int i = 0; i < threadNumber; i++){
            threads[i].join();
         }
      }
      catch(InterruptedException e){
         System.err.println(e);
      }
      return result;
   }   

   private class Finder implements Runnable{   
      @Override
      public void run(){
         while(!intervals.isEmpty()){
            Interval interval = intervals.poll();
            if(interval.high - interval.low > 1e-6){    
               double middle = (interval.high + interval.low) / 2;
               boolean something = true;
               if(something){
                  intervals.add(new Interval(interval.low + 0.1, middle - 0.1));
                  intervals.add(new Interval(middle + 0.1, interval.high - 0.1));
               }
               else{
                  intervals.add(new Interval(interval.low + 0.1, interval.high - 0.1));
               }
            }
         }
      }
   }

   private class Interval{
      double low;
      double high;
      public Interval(double low, double high){
         this.low = low;
         this.high = high;
      }
   }
}

您可能需要了解的程序相关信息:在每次迭代后,时间间隔应该要么消失(因为它太小),要么变小,要么分裂成两个更小的间隔。在没有剩余间隔时,工作完成。此外,我应该能够通过某个数字n限制执行此工作的线程数量。实际程序通过将间隔分成几部分并使用一些规则丢弃不能包含最大值的那些部分来查找某些函数的最大值,但这对我的问题并不是真正相关的。

2
什么是“间隔”(intervals)? - Jean Logeart
1
如果您想得到好的答案,请发布更多的代码(最好是一个最小的可工作示例)。 - Mick Mnemonic
我已经在代码中添加了间隔集合的定义。 - user1242967
1
你说“在每次迭代的开始,一个区间会从集合中移除,在结束时,一个或多个区间会被添加回同一集合”,这似乎意味着如果没有线程在循环中,集合永远不会变为空。然而,你的代码表明你希望程序在集合为空且循环中没有线程时终止。我建议你澄清这一点。 - jadhachem
实际代码过于数学密集,我不想用程序中不重要的部分来困扰您的思维,但我确实无法快速想到更好的例子。 - user1242967
显示剩余2条评论
6个回答

2
您可以使用原子标志,例如:
private ConcurrentLinkedQueue<Interval> intervals = new ConcurrentLinkedQueue<>();
private AtomicBoolean inUse = new AtomicBoolean();

@Override
public void run() {
    while (!intervals.isEmpty() && inUse.compareAndSet(false, true)) {
        // work
        inUse.set(false);
    }
}

更新:

问题已经更新,因此我会给出更好的解决方案。这是一种更“经典”的解决方案,使用阻塞队列;

private BlockingQueue<Interval> intervals = new ArrayBlockingQueue<Object>();
private volatile boolean finished = false;

@Override
public void run() {
    try {
        while (!finished) {
            Interval next = intervals.take();
            // put work there
            // after you decide work is finished just set finished = true
            intervals.put(interval); // anyway, return interval to queue
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

更新2

现在看来,更好的解决方案是重新编写代码,并将范围划分为每个线程的子范围。


你确定当一个线程决定不再添加间隔并设置停止处理的标志时,其他线程仍在工作并可能产生新的间隔进行处理,这个方法仍然有效吗? - Piotrek Bzdyl
1
我认为它不起作用。假设队列中只有一个间隔,我们有两个线程。第一个线程检查间隔(isEmpty为false),将inUse设置为true,从队列中删除一个元素并开始昂贵的计算(尚未生成新的间隔)。第二个线程检查条件并发现队列为空,因此决定退出循环。 - Łukasz Dembiński
1
检查条件!intervals.isEmpty() && inUse.compareAndSet(false, true)不是线程安全的,因为它包含两个非同步读取。 - Mick Mnemonic
问题更新后,我看到逻辑存在问题。好的,我会尝试提出更好的解决方案。 - Denis Borovikov
好的,我明白了。现在看来线程在没有协调的情况下无法决定停止。我认为在你的情况下最好的解决方案是将范围分成子范围,供每个线程使用。 - Denis Borovikov
显示剩余7条评论

2

CompletableFuture类也是这些任务的一个有趣解决方案。它会自动将工作负载分配到多个工作线程上。

static CompletableFuture<Integer> fibonacci(int n) {
  if(n < 2) return CompletableFuture.completedFuture(n);
  else {
    return CompletableFuture.supplyAsync(() -> {
      System.out.println(Thread.currentThread());
      CompletableFuture<Integer> f1 = fibonacci(n - 1);
      CompletableFuture<Integer> f2 = fibonacci(n - 2);
      return f1.thenCombineAsync(f2, (a, b) -> a + b);
    }).thenComposeAsync(f -> f);
  }
}

public static void main(String[] args) throws Exception {
  int fib = fibonacci(10).get();
  System.out.println(fib);
}

使用CompletableFuture可以获得+1分。更简单的方法是:在fibonacci(int)方法的else部分只需添加return fibonacci(n - 1).thenCombineAsync(fibonacci(n - 2), Integer::sum);即可。 - isnot2bad
但是我想让整个else部分在不同的线程中运行。如果按照您的建议,只有addition会并行运行。"System.out.println(Thread.currentThread())"语句始终会打印主线程。顺便说一下,还有第二个区别。您的代码是深度优先遍历递归调用的“树”,而我的是广度优先遍历。 - Tesseract
我已经测试过了。程序可以编译。thenComposeAsync(f -> f)可以将CompletableFuture<CompletableFuture<Integer>>转化为CompletableFuture<Integer> - Tesseract
你说得对。它编译通过了。不知道为什么第一次复制粘贴到我的IDE中时它没有工作。抱歉。 - isnot2bad
@SpiderPig,我认为对于严格递归问题来说,我的解决方案更易读(你可以从“RecursiveAction”实现中了解问题结构),而你使用的“CompletableFuture”实现则更加灵活。如果你使用“CompletableFuture.supplyAsync(Supplier, Executor)”,你可以轻松配置自己的执行器,以拥有所需的线程数。你还可以以多种方式组合生成的“CompletableFuture”。 - Piotrek Bzdyl

1
你的问题看起来像是一个递归问题 - 处理一个任务(区间)可能会产生一些子任务(子区间)。
为此,我会使用 ForkJoinPoolRecursiveTask:
class Interval {
    ...
}

class IntervalAction extends RecursiveAction {
    private Interval interval;

    private IntervalAction(Interval interval) {
        this.interval = interval;
    }

    @Override
    protected void compute() {
        if (...) {
            // we need two sub-tasks
            IntervalAction sub1 = new IntervalAction(new Interval(...));
            IntervalAction sub2 = new IntervalAction(new Interval(...));
            sub1.fork();
            sub2.fork();
            sub1.join();
            sub2.join();
        } else if (...) {
            // we need just one sub-task
            IntervalAction sub3 = new IntervalAction(new Interval(...));
            sub3.fork();
            sub3.join();
        } else {
            // current task doesn't need any sub-tasks, just return
        }
    }
}

public static void compute(Interval initial) {
    ForkJoinPool pool = new ForkJoinPool();
    pool.invoke(new IntervalAction(initial));
    // invoke will return when all the processing is completed
}

这个可能真的可行。我只需要确保最多有n个线程在执行此任务(n甚至可以是1...不确定在这种情况下是否可能实现此解决方案)。我该怎么做呢? - user1242967
请查看 ForkJoinPool javadoc 以及其并行配置。 - Piotrek Bzdyl

1
我遇到了同样的问题,并测试了以下解决方案。
在我的测试示例中,我有一个队列(相当于您的间隔),其中填充了整数。对于测试,每次迭代都会从队列中取出一个数字,将其递增并放回队列,如果新值低于7(任意),则具有与您的间隔生成机制相同的影响。
以下是一个示例工作代码(请注意,我在Java 1.8中开发,并使用Executor framework来处理我的线程池。):
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

public class Test {
    final int numberOfThreads;
    final BlockingQueue<Integer> queue;
    final BlockingQueue<Integer> availableThreadsTokens;
    final BlockingQueue<Integer> sleepingThreadsTokens;
    final ThreadPoolExecutor executor;

    public static void main(String[] args) {
        final Test test = new Test(2); // arbitrary number of thread => 2
        test.launch();
    }

    private Test(int numberOfThreads){
        this.numberOfThreads = numberOfThreads;
        this.queue = new PriorityBlockingQueue<Integer>(); 
        this.availableThreadsTokens = new LinkedBlockingQueue<Integer>(numberOfThreads);
        this.sleepingThreadsTokens = new LinkedBlockingQueue<Integer>(numberOfThreads);
        this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(numberOfThreads);
    }

    public void launch() {

        // put some elements in queue at the beginning
        queue.add(1);
        queue.add(2);
        queue.add(3);

        for(int i = 0; i < numberOfThreads; i++){
            availableThreadsTokens.add(1);
        }

        System.out.println("Start");

        boolean algorithmIsFinished = false;

        while(!algorithmIsFinished){
            if(sleepingThreadsTokens.size() != numberOfThreads){
                try {
                    availableThreadsTokens.take();
                } catch (final InterruptedException e) {
                    e.printStackTrace();
                    // some treatment should be put there in case of failure
                    break;
                }
                if(!queue.isEmpty()){ // Continuation condition
                    sleepingThreadsTokens.drainTo(availableThreadsTokens);
                    executor.submit(new Loop(queue.poll(), queue, availableThreadsTokens));
                }
                else{
                    sleepingThreadsTokens.add(1);
                }
            }
            else{
                algorithmIsFinished = true;
            }
        }

        executor.shutdown();
        System.out.println("Finished");
    }

    public static class Loop implements Runnable{
        int element;
        final BlockingQueue<Integer> queue;
        final BlockingQueue<Integer> availableThreadsTokens;

        public Loop(Integer element, BlockingQueue<Integer> queue, BlockingQueue<Integer> availableThreadsTokens){
            this.element = element;
            this.queue = queue;
            this.availableThreadsTokens = availableThreadsTokens;
        }

        @Override
        public void run(){
            System.out.println("taking element "+element);
            for(Long l = (long) 0; l < 500000000L; l++){
            }
            for(Long l = (long) 0; l < 500000000L; l++){
            }
            for(Long l = (long) 0; l < 500000000L; l++){
            }
            if(element < 7){
                this.queue.add(element+1);
                System.out.println("Inserted element"+(element + 1));
            }
            else{
                System.out.println("no insertion");
            }
            this.availableThreadsTokens.offer(1);
        }
    }   
}

我运行了这段代码进行检查,看起来它能正常工作。但是肯定有一些可以改进的地方:
  • sleepingThreadsTokens 不必是 BlockingQueue,因为只有主线程访问它。我使用了这个接口,因为它允许一个漂亮的 sleepingThreadsTokens.drainTo(availableThreadsTokens);
  • 我不确定队列是否必须是阻塞的,因为只有主线程从中取出元素,并且不等待元素(它仅等待令牌)。
  • ...

思路是主线程检查终止条件,为此它必须知道当前有多少线程正在工作(以便它不会因为队列为空而过早停止算法)。为此,创建了两个特定的队列:availableThreadsTokens 和 sleepingThreadsTokens。availableThreadsTokens 中的每个元素都表示完成迭代的线程,并等待再次给出。sleepingThreadsTokens 中的每个元素都表示可用于执行新迭代的线程,但队列为空,所以它没有任务并“睡眠”。因此,每个时刻 availableThreadsTokens.size() + sleepingThreadsTokens.size() = numberOfThreads - threadExcecutingIteration。

请注意,availableThreadsTokens 和 sleepingThreadsTokens 上的元素仅表示线程活动,它们不是线程,也不设计特定的线程。

终止情况:假设我们有N个线程(任意,固定数量)。 这N个线程正在等待工作(availableThreadsTokens中有N个令牌),队列中只剩下1个元素,并且这个元素的处理不会生成任何其他元素。主进程获取第一个令牌,发现队列不为空,取出该元素并将线程发送到工作状态。接下来的N-1个令牌逐一被消耗,并且由于队列为空,这些令牌逐一移动到sleepingThreadsTokens中。主进程知道循环中有1个正在工作的线程,因为availableThreadsTokens中没有令牌,而sleepingThreadsTokens中只有N-1个令牌,因此它等待(.take())。当线程完成并释放令牌时,主进程获取令牌,发现队列现在为空,并将最后一个令牌放入sleepingThreadsTokens中。 由于现在所有的令牌都在sleepingThreadsTokens中,主进程知道1)所有线程处于非活动状态,2)队列为空(否则,由于线程将执行作业,最后一个令牌就不会被转移到sleepingThreadsTokens中)。
请注意,如果工作线程在所有availableThreadsTokens都移动到sleepingThreadsTokens之前完成处理,则不会产生任何影响。
现在假设最后一个元素的处理将在队列中生成M个新元素,那么主线程将把所有睡眠线程令牌从availableThreadsTokens放回到sleepingThreadsTokens,并重新开始分配它们的处理。即使M < N,我们也会将所有令牌放回,因为我们不知道未来将插入多少元素,所以必须保持所有线程可用。

0
我建议采用主/工作进程的方法。
主进程遍历间隔并将该间隔的计算分配给不同的进程。它还根据需要进行删除/添加。这样,所有核心都得到利用,只有当所有间隔完成时,进程才完成。这也被称为动态工作分配。
以下是一个可能的示例:
public void run(){
  while(!intervals.isEmpty()){
    //remove one interval
    Thread t = new Thread(new Runnable()
    {
        //do calculations
    });
    t.run();
    //add some intervals
  }
}

您提供的可能解决方案被称为静态分配,您是正确的,它将以最慢的处理器完成,但动态方法将利用所有内存。

我不知道是否可能实现这个解决方案,因为从属线程决定将多少个间隔添加到集合中,而不是主线程。 - user1242967

0

我也遇到了这个问题。我解决的方法是使用AtomicInteger来知道队列中有什么。在每次offer()之前增加整数,在每次poll()之后减少整数。CLQ没有真正的isEmpty(),因为它必须查看头/尾节点,而这可以原子地改变(CAS)。

这并不能保证100%的线程可能会在另一个线程减少之后增加,因此您需要在结束线程之前再次检查。这比依赖while(...isEmpty())要好。

除此之外,您可能需要进行同步。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接