如何在Java中实现多线程广度优先搜索?

5

我已经以一种正常的方式完成了广度优先搜索。 现在我正在尝试以多线程的方式完成它。 我有一个队列,这个队列在线程之间共享。 当我从队列中删除节点(FIFI队列)时,我使用synchronize(LockObject)进行同步。 我的目标是:当一个线程找到解决方案时,所有其他线程都会立即停止。


抱歉,您的问题是什么?如何停止其他线程? - Luis
那么每个线程都会走不同的路径?这是你想要的吗? - Hunter McMillen
每个线程都会取一个节点,并判断它是否是目标节点,如果不是,则将4个子节点添加到同一队列中。这是我的解决方案。如果有更好的方法,请随意分享。谢谢。 - Kassem
4个回答

2
我猜你正在遍历一棵树进行BFS。
创建一个线程池。对于节点中未探索的每个子节点,从线程池中获取一个线程(可以使用Semaphore)。标记子节点为“已探索”,并以BFS方式探索其子节点。当找到解决方案或完成所有节点的探索时,释放信号量。
^ 我以前从未做过这个,所以可能会漏掉一些东西。

实际上我正在构建树并同时进行检查。 - Kassem
实际上我是边构建树边前进,我正在尝试解决硬币问题,即找到给定金额的最佳组合。最佳组合意味着硬币数量较少但价值更高。 - Kassem
我这么做的:从队列中开始用一个根节点,取出该节点并展开它(展开意味着在其中添加4个子节点,每个子节点代表一种硬币类型;例如:1美分,5美分,10美分,20美分)。之后,我会取出队列中的第一个节点并检查其总价值是否为我想要的(比如21美分)。如果不是,则我会给此节点添加4个子节点并将其加入队列中(通过给节点添加子节点,我指的是创建我创建的coin类型对象,并在该对象内部有一个表示父硬币的coin类型变量)。 - Kassem
1
请注意,如果由于某种原因有两个解决方案具有正确的金额但硬币数量不同,并且两者都开始执行(因为第一个尚未完成),则无法保证您将获得更短的解决方案,除非您单独管理它。请参见下面的完整答案。 - Luis

2
我已经成功地实现了它。我的做法是获取第一层的所有节点,例如4个节点。接着我使用了2个线程。每个线程处理2个节点并生成他们的子节点。当一个节点找到解决方案时,它必须报告所在的层数并限制搜索的层数,以便其他线程不会超过该层。
只有报告方法需要同步。
我已经完成了硬币找零问题的代码。以下是我编写的代码,供他人使用:
主类(CoinsProblemBFS.java)
package coinsproblembfs;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Scanner;

/**
 *
 * @author Kassem M. Bagher
 */
public class CoinsProblemBFS
{

    private static List<Item> MoneyList = new ArrayList<Item>();
    private static Queue<Item> q = new LinkedList<Item>();
    private static LinkedList<Item> tmpQ;
    public static Object lockLevelLimitation = new Object();
    public static int searchLevelLimit = 1000;
    public static Item lastFoundNode = null;
    private static int numberOfThreads = 2;

    private static void InitializeQueu(Item Root)
    {
        for (int x = 0; x < MoneyList.size(); x++)
        {
            Item t = new Item();
            t.value = MoneyList.get(x).value;
            t.Totalvalue = MoneyList.get(x).Totalvalue;
            t.Title = MoneyList.get(x).Title;
            t.parent = Root;
            t.level = 1;
            q.add(t);
        }
    }

    private static int[] calculateQueueLimit(int numberOfItems, int numberOfThreads)
    {
        int total = 0;
        int[] queueLimit = new int[numberOfThreads];

        for (int x = 0; x < numberOfItems; x++)
        {
            if (total < numberOfItems)
            {
                queueLimit[x % numberOfThreads] += 1;
                total++;
            }
            else
            {
                break;
            }
        }
        return queueLimit;
    }

    private static void initializeMoneyList(int numberOfItems, Item Root)
    {
        for (int x = 0; x < numberOfItems; x++)
        {
            Scanner input = new Scanner(System.in);
            Item t = new Item();
            System.out.print("Enter the Title and Value for item " + (x + 1) + ": ");
            String tmp = input.nextLine();
            t.Title = tmp.split(" ")[0];
            t.value = Double.parseDouble(tmp.split(" ")[1]);
            t.Totalvalue = t.value;
            t.parent = Root;
            MoneyList.add(t);
        }
    }

    private static void printPath(Item item)
    {
        System.out.println("\nSolution Found in Thread:" + item.winnerThreadName + "\nExecution Time: " + item.searchTime + " ms, " + (item.searchTime / 1000) + " s");
        while (item != null)
        {
            for (Item listItem : MoneyList)
            {
                if (listItem.Title.equals(item.Title))
                {
                    listItem.counter++;
                }
            }
            item = item.parent;
        }
        for (Item listItem : MoneyList)
        {
            System.out.println(listItem.Title + " x " + listItem.counter);
        }

    }

    public static void main(String[] args) throws InterruptedException
    {
        Item Root = new Item();
        Root.Title = "Root Node";
        Scanner input = new Scanner(System.in);
        System.out.print("Number of Items: ");
        int numberOfItems = input.nextInt();
        input.nextLine();

        initializeMoneyList(numberOfItems, Root);

        
        System.out.print("Enter the Amount of Money: ");
        double searchValue = input.nextDouble();
        int searchLimit = (int) Math.ceil((searchValue / MoneyList.get(MoneyList.size() - 1).value));
        System.out.print("Number of Threads (Muste be less than the number of items): ");
        numberOfThreads = input.nextInt();

        if (numberOfThreads > numberOfItems)
        {
            System.exit(1);
        }

        InitializeQueu(Root);

        int[] queueLimit = calculateQueueLimit(numberOfItems, numberOfThreads);
        List<Thread> threadList = new ArrayList<Thread>();

        for (int x = 0; x < numberOfThreads; x++)
        {
            tmpQ = new LinkedList<Item>();
            for (int y = 0; y < queueLimit[x]; y++)
            {
                tmpQ.add(q.remove());
            }
            BFS tmpThreadObject = new BFS(MoneyList, searchValue, tmpQ);
            Thread t = new Thread(tmpThreadObject);
            t.setName((x + 1) + "");
            threadList.add(t);
        }

        for (Thread t : threadList)
        {
            t.start();
        }

        boolean finish = false;

        while (!finish)
        {
            Thread.sleep(250);
            for (Thread t : threadList)
            {
                if (t.isAlive())
                {
                    finish = false;
                    break;
                }
                else
                {
                    finish = true;
                }
            }
        }
        printPath(lastFoundNode);

    }
}

项目类(Item.java)

package coinsproblembfs;

/**
 *
 * @author Kassem
 */
public class Item
{
    String Title = "";
    double value = 0;
    int level = 0;
    double Totalvalue = 0;
    int counter = 0;
    Item parent = null;
    long searchTime = 0;
    String winnerThreadName="";
}

线程类(BFS.java)

package coinsproblembfs;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

/**
 *
 * @author Kassem M. Bagher
 */
public class BFS implements Runnable
{

    private LinkedList<Item> q;
    private List<Item> MoneyList;
    private double searchValue = 0;
    private long start = 0, end = 0;

    public BFS(List<Item> monyList, double searchValue, LinkedList<Item> queue)
    {
        q = new LinkedList<Item>();
        MoneyList = new ArrayList<Item>();
        this.searchValue = searchValue;
        for (int x = 0; x < queue.size(); x++)
        {
            q.addLast(queue.get(x));
        }
        for (int x = 0; x < monyList.size(); x++)
        {
            MoneyList.add(monyList.get(x));
        }
    }

    private synchronized void printPath(Item item)
    {

        while (item != null)
        {
            for (Item listItem : MoneyList)
            {
                if (listItem.Title.equals(item.Title))
                {
                    listItem.counter++;
                }
            }
            item = item.parent;
        }
        for (Item listItem : MoneyList)
        {
            System.out.println(listItem.Title + " x " + listItem.counter);
        }
    }

    private void addChildren(Item node, LinkedList<Item> q, boolean initialized)
    {
        for (int x = 0; x < MoneyList.size(); x++)
        {
            Item t = new Item();
            t.value = MoneyList.get(x).value;
            if (initialized)
            {
                t.Totalvalue = 0;
                t.level = 0;
            }
            else
            {
                t.parent = node;
                t.Totalvalue = MoneyList.get(x).Totalvalue;
                if (t.parent == null)
                {
                    t.level = 0;
                }
                else
                {
                    t.level = t.parent.level + 1;
                }
            }
            t.Title = MoneyList.get(x).Title;
            q.addLast(t);
        }
    }

    @Override
    public void run()
    {
        start = System.currentTimeMillis();
        try
        {
            while (!q.isEmpty())
            {
                Item node = null;
                node = (Item) q.removeFirst();
                node.Totalvalue = node.value + node.parent.Totalvalue;
                if (node.level < CoinsProblemBFS.searchLevelLimit)
                {
                    if (node.Totalvalue == searchValue)
                    {
                        synchronized (CoinsProblemBFS.lockLevelLimitation)
                        {
                            CoinsProblemBFS.searchLevelLimit = node.level;
                            CoinsProblemBFS.lastFoundNode = node;
                            end = System.currentTimeMillis();
                            CoinsProblemBFS.lastFoundNode.searchTime = (end - start);
                            CoinsProblemBFS.lastFoundNode.winnerThreadName=Thread.currentThread().getName();
                        }
                    }
                    else
                    {
                        if (node.level + 1 < CoinsProblemBFS.searchLevelLimit)
                        {
                            addChildren(node, q, false);
                        }
                    }
                }
            }
        } catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

示例输入:

Number of Items: 4
Enter the Title and Value for item 1: one 1
Enter the Title and Value for item 2: five 5
Enter the Title and Value for item 3: ten 10
Enter the Title and Value for item 4: twenty 20
Enter the Amount of Money: 150
Number of Threads (Muste be less than the number of items): 2

2
假设您想要迭代地解决这个问题(请参阅底部的注释,了解可能有更好的封闭解决方案的原因),这对于练习多线程来说并不是一个很好的问题。问题在于,如果您不依赖于先前的结果,多线程就非常好用,但在这里,您希望使用最少的硬币数量。
正如您所指出的那样,广度优先的解决方案保证一旦到达所需金额,您将不会在单线程环境中有任何更少硬币的解决方案。然而,在多线程环境中,一旦开始计算解决方案,您无法保证它会在其他解决方案之前完成。让我们以21为例:它可以是20分硬币和1分硬币,或者四个5分硬币和1分硬币;如果两者同时计算,您无法保证第一个(正确的)解决方案会先完成。实际上,这种情况发生的可能性很小,但是当您使用多线程时,您希望解决方案在理论上可行,因为多线程总是在演示中失败,即使它们在整个宇宙的寿命内都不应该失败。
现在您有两种可能的解决方案:一种是在每个级别的开始引入瓶颈;您不会启动该级别,直到前一个级别完成。另一种是一旦达到解决方案,继续使用低于当前结果的所有计算(这意味着您不能清除其他计算)。可能需要进行所有同步才能获得更好的性能,因此最好采用单线程,但让我们继续。
对于第一种解决方案,自然形式是迭代增加级别。您可以使用happymeal提供的解决方案,并使用Semaphore。另一种选择是使用Java提供的新类。
CoinSet getCoinSet(int desiredAmount) throws InterruptedException {
  // Use whatever number of threads you prefer or another member of Executors.
  final ExecutorService executor = Executors.newFixedThreadPool(10); 
  ResultContainer container = new ResultContainer();
  container.getNext().add(new Producer(desiredAmount, new CoinSet(), container));
  while (container.getResult() == null) {
    executor.invokeAll(container.setNext(new Vector<Producer>()));
  }
  return container.getResult();
} 

public class Producer implements Callable<CoinSet> {
  private final int desiredAmount;
  private final CoinSet data;
  private final ResultContainer container;

  public Producer(int desiredAmount, CoinSet data, ResultContainer container) {
    this.desiredAmount = desiredAmount;
    this.data = data;
    this.container = container;
  }

  public CoinSet call() {
    if (data.getSum() == desiredAmount) {
      container.setResult(data);
      return data;
    } else {
      Collection<CoinSet> nextSets = data.addCoins();
      for (CoinSet nextSet : nextSets) {
        container.getNext().add(new Producer(desiredAmount, nextSet, container));
      }
      return null;
    }
  }
}

// Probably it is better to split this class, but you then need to pass too many parameters
// The only really needed part is to create a wrapper around getNext, since invokeAll is
// undefined if you modify the list of tasks.
public class ResultContainer {
  // I use Vector because it is synchronized.

  private Vector<Producer> next = new Vector<Producer>();
  private CoinSet result = null;

  // Note I return the existing value.
  public Vector<Producer> setNext(Vector<Producer> newValue) {
    Vector<Producer> current = next;
    next = newValue;
    return current;
  }

  public Vector<Producer> getNext() {
    return next;
  }

  public synchronized void setResult(CoinSet newValue) {
    result = newValue;
  }

  public synchronized CoinSet getResult() {
   return result;
  }
}

这仍然存在一个问题,即现有任务会被执行;但是解决这个问题很简单:将线程执行器传递给每个生产者(或容器)。然后,在找到结果时调用executor.shutdownNow。正在执行的线程不会被中断,但每个线程中的操作都是微不足道的,因此它将快速完成;尚未启动的可运行项将不会启动。
第二个选项意味着您必须让所有当前任务完成,除非您跟踪了每个级别运行的任务数。您不再需要跟踪级别,也不需要while循环。相反,您只需调用
executor.submit(new Producer(new CoinSet(), desiredAmount, container)).get();

然后,调用方法非常相似(假设您在生产者中有执行器):

public CoinSet call() {
  if (container.getResult() != null && data.getCount() < container.getResult().getCount()) {
    if (data.getSum() == desiredAmount)) {
      container.setResult(data);
      return data;
    } else {
      Collection<CoinSet> nextSets = data.addCoins();
      for (CoinSet nextSet : nextSets) {
        executor.submit(new Producer(desiredAmount, nextSet, container));
      }
      return null;
    }
  }
}

你还需要修改 container.setResult,因为你不能保证在if语句和设置值之间它没有被其他线程设置过了(线程真的很烦人,不是吗?)

public synchronized void setResult(CoinSet newValue) {
  if (newValue.getCount() < result.getCount()) {
    result = newValue;
  }
}

在之前的所有答案中,CoinSet.getSum()返回集合中硬币的总和,CoinSet.getCount()返回硬币的数量,CoinSet.addCoins()返回一个CoinSet的集合,其中每个元素都是当前CoinSet加上每个可能不同价值的一枚硬币。
注意:对于具有1、5、10和20价值的硬币问题,最简单的解决方案是将金额除以最大硬币。然后取模并使用下一个更大的值,依此类推。这是您需要的最少硬币数量。当以下属性为真时,该规则适用(AFAICT):对于所有连续的硬币值对(即在这种情况下,1-5、5-10、10-20),您可以使用较大的元素和必要的任何硬币以较少的硬币数达到较低元素的整数倍。您只需要证明它到一对中的最小公共倍数(之后它会重复)。

请问,与硬币有关的问题是什么? - Avi
@Avi:请查看happymeal回答中的评论。 - meriton

2
我从你对happymeal答案的评论中了解到,你正在尝试通过添加1分、5分、10分和20分硬币来达到特定金额。
由于每个硬币面额都是下一个更大硬币面额的倍数,因此可以按照以下方式在恒定时间内解决问题:
int[] coinCount(int amount) {
    int[] coinValue = {20, 10, 5, 1};
    int[] coinCount = new int[coinValue.length];

    for (int i = 0; i < coinValue.length; i++) {
        coinCount[i] = amount / coinValue[i];
        amount -= coinCount[i] * coinValue[i];
    }
    return coinCount;
}

重点提示:在使用多线程之前,尝试优化您的算法,因为算法改进可以产生更大的改进。
请注意保留HTML标签。

1
完全同意,正确的解决方案不是使用多线程,而是进行优化(我甚至在我的笔记中有这个)。但这更像是一个教科书式的练习,而不是其他什么。事实上,如果你没有进行优化就采用广度优先搜索,当你到达8个硬币时,你将有4^8(2^16,约32E3)种解决方案,每个int占4字节,需要256KB内存。 - Luis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接