Hadoop矩阵乘法

5
我正在运行在http://www.norstad.org/matrix-multiply/index.html找到的MapReduce矩阵乘法程序。 我发现当输入矩阵中有0时,此实现无法正常工作。但是我不明白为什么,以及如何修改程序使其正常工作? MapReduce操作完成,但输出始终是所有元素为0的矩阵。
我的输入矩阵A和B是:
Matrix A     Matrix B
0 0 0        6 7 4 
0 1 6        9 1 3 
7 8 9        7 6 2  

输出矩阵:

0 0 0
0 0 0
0 0 0

以下内容摘自该作业的日志文件:
矩阵B的映射输出:
##### Map setup: matrixA = false for hdfs://localhost/user/hadoop-user/B
strategy = 4
R1 = 4
I = 3
K = 3
J = 3
IB = 3
KB = 3
JB = 3
##### Map input: (0,0) 6
##### Map output: (0,0,0,1) (0,0,6) 
##### Map input: (0,1) 7
##### Map output: (0,0,0,1) (0,1,7) 
##### Map input: (0,2) 4
##### Map output: (0,0,0,1) (0,2,4) 
##### Map input: (1,0) 9
##### Map output: (0,0,0,1) (1,0,9) 
##### Map input: (1,1) 1
##### Map output: (0,0,0,1) (1,1,1) 
##### Map input: (1,2) 3
##### Map output: (0,0,0,1) (1,2,3) 
##### Map input: (2,0) 7
##### Map output: (0,0,0,1) (2,0,7) 
##### Map input: (2,1) 6
##### Map output: (0,0,0,1) (2,1,6) 
##### Map input: (2,2) 2
##### Map output: (0,0,0,1) (2,2,2) 

矩阵A的映射输出:

##### Map setup: matrixA = true for hdfs://localhost/user/hadoop-user/A
strategy = 4
R1 = 4
I = 3
K = 3
J = 3
IB = 3
KB = 3
JB = 3
##### Map input: (1,1) 1
##### Map output: (0,0,0,0) (1,1,1) 
##### Map input: (1,2) 6
##### Map output: (0,0,0,0) (1,2,6) 
##### Map input: (2,0) 7
##### Map output: (0,0,0,0) (2,0,7) 
##### Map input: (2,1) 8
##### Map output: (0,0,0,0) (2,1,8) 
##### Map input: (2,2) 9
##### Map output: (0,0,0,0) (2,2,9) 

注意,矩阵A的Map不会从输入文件中读取零值。 注意:两个输入文件都以序列文件的形式存储在HDFS中,输出也是序列文件。有人能解释一下问题出在哪里吗?谢谢! Mapper类的代码如下:
/** The job 1 mapper class. */

private static class Job1Mapper 
    extends Mapper<IndexPair, IntWritable, Key, Value>
{
    private Path path;
    private boolean matrixA;
    private Key key = new Key();
    private Value value = new Value();

    public void setup (Context context) {
        init(context);
        FileSplit split = (FileSplit)context.getInputSplit();
        path = split.getPath();
        matrixA = path.toString().startsWith(inputPathA);
        if (DEBUG) {
            System.out.println("##### Map setup: matrixA = " + matrixA + " for " + path);
            System.out.println("   strategy = " + strategy);
            System.out.println("   R1 = " + R1);
            System.out.println("   I = " + I);
            System.out.println("   K = " + K);
            System.out.println("   J = " + J);
            System.out.println("   IB = " + IB);
            System.out.println("   KB = " + KB);
            System.out.println("   JB = " + JB);
        }
    }

    private void printMapInput (IndexPair indexPair, IntWritable el) {
        System.out.println("##### Map input: (" + indexPair.index1 + "," + 
            indexPair.index2 + ") " + el.get());
    }

    private void printMapOutput (Key key, Value value) {
        System.out.println("##### Map output: (" + key.index1 + "," + 
            key.index2 + "," + key.index3 + "," + key.m + ") (" + 
            value.index1 + "," + value.index2 + "," + value.v + ") ");
    }

    private void badIndex (int index, int dim, String msg) {
        System.err.println("Invalid " + msg + " in " + path + ": " + index + " " + dim);
        System.exit(1);
    }

    public void map (IndexPair indexPair, IntWritable el, Context context)
        throws IOException, InterruptedException 
    {
        if (DEBUG) printMapInput(indexPair, el);
        int i = 0;
        int k = 0;
        int j = 0;
        if (matrixA) {
            i = indexPair.index1;
            if (i < 0 || i >= I) badIndex(i, I, "A row index");
            k = indexPair.index2;
            if (k < 0 || k >= K) badIndex(k, K, "A column index");
        } else {
            k = indexPair.index1;
            if (k < 0 || k >= K) badIndex(k, K, "B row index");
            j = indexPair.index2;
            if (j < 0 || j >= J) badIndex(j, J, "B column index");
        }
        value.v = el.get();
                if (matrixA) {
                    key.index1 = i/IB;
                    key.index3 = k/KB;
                    key.m = 0;
                    value.index1 = i % IB;
                    value.index2 = k % KB;
                    for (int jb = 0; jb < NJB; jb++) {
                        key.index2 = jb;
                        context.write(key, value);
                        if (DEBUG) printMapOutput(key, value);
                    }
                } else {
                    key.index2 = j/JB;
                    key.index3 = k/KB;
                    key.m = 1;
                    value.index1 = k % KB;
                    value.index2 = j % JB;
                    for (int ib = 0; ib < NIB; ib++) {
                        key.index1 = ib;
                        context.write(key, value);
                        if (DEBUG) printMapOutput(key, value);
                    }
        }
    }
}

如果出现语法错误,那只是因为我复制粘贴了代码并在这里进行了修改,所以我可能会忽略一些东西。
我需要帮助理解Map函数的逻辑,它不能从输入文件中读取0,有人能告诉我为什么吗?

1
这是超过700行的代码。你认为有人会帮你调试吗? - Thomas Jungblut
我修改了我的帖子,只包括映射器类代码。我需要帮助理解逻辑,也就是说,我无法解释为什么Map函数没有读取输入文件中的0。 - Chaos
@ThomasJungblut - 我以为你会推广 Hama 用于矩阵乘法 :) - Praveen Sripati
@PraveenSripati,使用Hama(~0.2.0或类似版本)在MapReduce和HBase中有古老的矩阵乘法代码。但由于这不是迭代过程,我认为BSP并不比MapReduce更适合这种情况 ;) - Thomas Jungblut
你们知道有没有其他版本的矩阵乘法可以使用吗? - Chaos
显示剩余2条评论
1个回答

3
在您提供的网站中的TestMatrixMultiply.java文件中,该文件包含将矩阵编码成期望的IndexPair-backed文件格式的代码。其中有一个名为writeMatrix的函数:
public static void writeMatrix (int[][] matrix, int rowDim, int colDim, String pathStr)
    throws IOException
{
    Path path = new Path(pathStr);
    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path, 
        MatrixMultiply.IndexPair.class, IntWritable.class, 
        SequenceFile.CompressionType.NONE);
    MatrixMultiply.IndexPair indexPair = new MatrixMultiply.IndexPair();
    IntWritable el = new IntWritable();
    for (int i = 0; i < rowDim; i++) {
        for (int j = 0; j < colDim; j++) {
            int v = matrix[i][j];
            if (v != 0) { // !!! well, that would be why we aren't writing 0s
                indexPair.index1 = i;
                indexPair.index2 = j;
                el.set(v);
                writer.append(indexPair, el);
            }
        }
    }
    writer.close();
}

在内部的第二个for循环中插入了注释。

您的映射器读取不到0,因为输入文件中没有0。

该代码被设计成假定所有缺失值都为0,并执行额外的检查以避免发出0,以尝试最小化网络流量。

以下内容大多数是错误的,因为我误解了算法
(上面的部分仍然有用)

从链接页面可以看出,您正在使用策略3。策略3依赖于分区器行为和排序顺序。不幸的是,分区器是错误的!这与未打印0无关。这里的分区器就是完全错误的,您得到的矩阵中充满了0,因为它乘以先前初始化为0并从未用正确的块数据覆盖的数据。这在单机操作中是隐藏的,因为分区器是一个空操作,但在大多数集群中会出现问题。

分区器将中间键(kb,jb,ib)映射到减速器r如下:

r =(jb * KB + kb)mod R

它需要保证同一块的所有键都分配给同一个减速器。不幸的是,它保证除非KB%numReducers == 0,否则这将不会发生:

map (key, value)
   if from matrix A with key=(i,k) and value=a(i,k)
      for 0 <= jb < NJB
         emit (k/KB, jb, i/IB), (i mod IB, k mod KB, a(k,j)) // compare this...
   if from matrix B with key=(k,j) and value=b(k,j)
       emit (k/KB, j/JB, -1), (k mod KB, j mod KB, b(k,j))  // ...to this

对于矩阵A,正在迭代关键字jb。对于矩阵B,正在计算关键字jb。由于赋值给reducer是轮流进行的,这保证了A矩阵关键字不会被分配到与B矩阵关键字相同的reducer。因此,该算法失败了,因为它假设了一些关于关键字分配和排序的事情是可以证明是不正确的。只有当所有关键字都分配给一个reducer时,它才正确地按关键字顺序排序,但分区器是错误的!
为实现策略3,必须修改分区器以使用kb%numReducers。这不是一个非常好的分区器,但由于需要将非相同的关键字按特定顺序排序到同一个reducer中,因此这是唯一可行的分区器。
您实际上在问题中放置的代码与存在错误的位置无关。

非常感谢您的回复。明天我会去实验室检查一下是否有效,并且我会告诉您 :) - Chaos
1
@shailesh,不要花太多时间在这上面 - 我刚意识到我错了。关键参数是按块分配的,A矩阵块需要分配给每个相应的B矩阵块,因此迭代是适当的;这是一对一的关系,而不是范围关系。我现在怀疑这更多地与行初始化被跳过有关,因为没有在预期位置上找到块,但我将不得不再试一次。我的有关分区器的答案是错误的,尽管0检测是正确的。尝试删除非零检查并找出会发生什么? - Adam Norberg
另外,你能给我推荐一些关于Hadoop的好资料吗?我想更深入地了解MapReduce、分区器等内部工作原理。 - Chaos
@shailesh,我希望我能帮忙,但除了官方教程和相关文档之外,我自己也不知道有什么特别好的。我刚刚花了比我想承认的时间在Hadoop上实现了一些并不那么复杂的东西,但似乎让我深入了解了Hadoop可以轻松隐藏的几乎每一个讨厌的小错误。 - Adam Norberg
我只是觉得我不喜欢策略4的实现方式,这正是你正在使用的(我之前误读了)。尝试将策略设置为2或3,看看是否会自动开始更好地工作。我唯一能推荐的就是做和我一样的事情-计算每个位置在什么情况下被写入,然后从中找出它们不被写入的情况,然后弄清楚为什么会发生这种情况。 - Adam Norberg
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接