需要帮助使用Hadoop MapReduce实现这个算法

6
我有一个算法,它将遍历一个大型数据集,读取一些文本文件,并在这些行中搜索特定的术语。我已经用Java实现了它,但我不想发布代码,因为这样看起来像是在寻找别人来帮我实现,但事实上我真的需要很多帮助!这并不是我项目计划的一部分,但数据集证明非常庞大,所以老师告诉我必须这么做。
编辑:(我之前的版本没有澄清)我拥有的数据集位于Hadoop集群上,我应该进行MapReduce实现。
我正在阅读关于MapReduce的内容,认为我应该先进行标准实现,然后再使用MapReduce会更容易。但并没有发生,因为算法相当愚蠢而且没有什么特别的,而MapReduce……我无法理解它。
以下是我的算法伪代码简述。
LIST termList   (there is method that creates this list from lucene index)
FOLDER topFolder

INPUT topFolder
IF it is folder and not empty
    list files (there are 30 sub folders inside)
    FOR EACH sub folder
        GET file "CheckedFile.txt"
        analyze(CheckedFile)
    ENDFOR
END IF


Method ANALYZE(CheckedFile)

read CheckedFile
WHILE CheckedFile has next line
    GET line
    FOR(loops through termList)
            GET third word from line
          IF third word = term from list
        append whole line to string buffer
    ENDIF
ENDFOR
END WHILE
OUTPUT string buffer to file

此外,正如您所看到的,每次调用“分析”时,都必须创建新文件。我理解mapreduce很难写入多个输出???
我了解mapreduce的直觉,并且我的示例似乎非常适合mapreduce,但是当涉及到实际操作时,显然我不够了解,现在卡住了!
请帮帮忙。
2个回答

3

您可以使用一个空的Reducer,将作业分区以便每个文件运行单个Mapper。每个Mapper都将在输出文件夹中创建自己的输出文件。


嗨!谢谢你的回答!!!但我不确定我理解:/ 你能给我更多信息吗?你有类似的例子吗? - Julia

2

使用一些不错的Java 6并发特性,特别是Future、Callable和ExecutorService,可以轻松实现Map Reduce。

我创建了一个Callable,它将按照您指定的方式分析文件。

public class FileAnalyser implements Callable<String> {

  private Scanner scanner;
  private List<String> termList;

  public FileAnalyser(String filename, List<String> termList) throws FileNotFoundException {
    this.termList = termList;
    scanner = new Scanner(new File(filename));
  }

  @Override
  public String call() throws Exception {
    StringBuilder buffer = new StringBuilder();
    while (scanner.hasNextLine()) {
      String line = scanner.nextLine();
      String[] tokens = line.split(" ");
      if ((tokens.length >= 3) && (inTermList(tokens[2])))
        buffer.append(line);
    }
    return buffer.toString();
  }

  private boolean inTermList(String term) {
    return termList.contains(term);
  }
}

我们需要为每个找到的文件创建一个新的可调用对象,并将其提交给执行器服务。提交的结果是一个Future,我们可以稍后使用它来获取文件解析的结果。
public class Analayser {

  private static final int THREAD_COUNT = 10;

  public static void main(String[] args) {

    //All callables will be submitted to this executor service
    //Play around with THREAD_COUNT for optimum performance
    ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);

    //Store all futures in this list so we can refer to them easily
    List<Future<String>> futureList = new ArrayList<Future<String>>();

    //Some random term list, I don't know what you're using.
    List<String> termList = new ArrayList<String>();
    termList.add("terma");
    termList.add("termb");

    //For each file you find, create a new FileAnalyser callable and submit
    //this to the executor service. Add the future to the list
    //so we can check back on the result later
    for each filename in all files {
      try {
        Callable<String> worker = new FileAnalyser(filename, termList);
        Future<String> future = executor.submit(worker);
        futureList.add(future);
      }
      catch (FileNotFoundException fnfe) {
        //If the file doesn't exist at this point we can probably ignore,
        //but I'll leave that for you to decide.
        System.err.println("Unable to create future for " + filename);
        fnfe.printStackTrace(System.err);
      }
    }

    //You may want to wait at this point, until all threads have finished
    //You could maybe loop through each future until allDone() holds true
    //for each of them.

    //Loop over all finished futures and do something with the result
    //from each
    for (Future<String> current : futureList) {
      String result = current.get();
      //Do something with the result from this future
    }
  }
}

我的这个例子还不够完整,也不够高效。我没有考虑样本大小,如果它非常大,您可以循环遍历futureList,删除已完成的元素,类似于:

while (futureList.size() > 0) {
      for (Future<String> current : futureList) {
        if (current.isDone()) {
          String result = current.get();
          //Do something with result
          futureList.remove(current);
          break; //We have modified the list during iteration, best break out of for-loop
        }
      }
}

或者你可以实现一个生产者-消费者类型的设置,其中生产者向执行器服务提交可调用对象并生成一个future,而消费者则获取future的结果并丢弃它。

这可能需要生产者和消费者本身成为线程,并使用同步列表添加/删除futures。

如有任何问题,请提出。


你好!非常感谢提供的解决方案!!很抱歉,我可能没有清楚地说明问题,尽管我尝试过。我的错误,我只在标题中提到了Hadoop,但我的数据集存储在运行Hadoop的集群上,因此我应该根据Hadoop MapReduce框架进行实现...我现在会编辑我的帖子。我正在分析的数据集有6GB:/ 太大了,并发处理能处理吗??? - Julia
哎呀,我在这里是个新手:D为了稍微挽回一点面子,我在100个文件上运行了我的代码,每个文件大小约为61MB,总共约6GB。我不太确定你的文件解析器是如何工作的,所以略去了详细信息,只是扫描了每一行并返回了一个空字符串。有点牵强。性能还不错,线程池大小为100,因此所有100个文件都被解析,而不需要由执行器服务排队。总运行时间为17分钟,在我的Atom处理器上。很抱歉我不能正确回答你的问题。我没有使用Hadoop的经验,但在阅读了SquareCog的答案后,我觉得他说得有道理。 - Karl Walsh
嗨!非常感谢你的帮助,因为我无法应对hadoop MR的脑力和时间。我将有几个类似的算法要实现,所以我必须尝试以我能够胜任的方式来做。在任何地方都找不到hadoop的帮助:/ 所以我采用了你的代码,在我的Intel 2Ghz上,使用线程池42花费了大约20分钟来解析并将结果输出到新文件中,但仅使用了200Mb的数据(42个文件)。再次感谢你的帮助。我需要对解析器进行一些修改,它必须进行更严格的匹配,而不是纯粹的“包含”术语,所以当我运行它时,我会让你知道结果 :) - Julia

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接