如何高效地在矩阵中找到给定大小的相同值矩形区域?

9
我的问题很简单,但我还没有找到高效的实现方法。
假设有一个像这样的矩阵A:
0 0 0 0 0 0 0
4 4 2 2 2 0 0
4 4 2 2 2 0 0
0 0 2 2 2 1 1
0 0 0 0 0 1 1

现在我想找到这个矩阵中所有给定大小的矩形区域的起始位置。一个区域是A的子集,其中所有数字都相同。
假设宽度为2,高度为3。有3个具有此大小的区域:
2 2   2 2   0 0
2 2   2 2   0 0
2 2   2 2   0 0

该函数调用的结果将是一个起始位置列表(x、y从0开始)的列表,表示那些区域。
List((2,1),(3,1),(5,0))

以下是我的当前实现。这里称为“区域”的是“表面”。
case class Dimension2D(width: Int, height: Int)
case class Position2D(x: Int, y: Int)

def findFlatSurfaces(matrix: Array[Array[Int]], surfaceSize: Dimension2D): List[Position2D] = {

    val matrixWidth = matrix.length
    val matrixHeight = matrix(0).length
    var resultPositions: List[Position2D] = Nil

    for (y <- 0 to matrixHeight - surfaceSize.height) {
        var x = 0
        while (x <= matrixWidth - surfaceSize.width) {
            val topLeft = matrix(x)(y)
            val topRight = matrix(x + surfaceSize.width - 1)(y)
            val bottomLeft = matrix(x)(y + surfaceSize.height - 1)
            val bottomRight = matrix(x + surfaceSize.width - 1)(y + surfaceSize.height - 1)
            // investigate further if corners are equal
            if (topLeft == bottomLeft && topLeft == topRight && topLeft == bottomRight) {
                breakable {
                    for (sx <- x until x + surfaceSize.width;
                         sy <- y until y + surfaceSize.height) {
                        if (matrix(sx)(sy) != topLeft) {
                            x = if (x == sx) sx + 1 else sx 
                            break
                        }
                    }
                    // found one!       
                    resultPositions ::= Position2D(x, y)
                    x += 1
                }
            } else if (topRight != bottomRight) {
                // can skip x a bit as there won't be a valid match in current row in this area
                x += surfaceSize.width 
            } else {
                x += 1
            }
        }   
    }
    return resultPositions
}

我已经尝试过对其进行一些优化,但我相信还有更好的解决方案。是否存在matlab函数可供移植?我也想知道这个问题是否有自己的名称,因为我不确定该搜索什么。
谢谢你考虑这个问题!期待看到您的提议或解决方案 :)
编辑:我的应用程序中矩阵的维度大约在300x300到3000x3000之间。此外,该算法将仅针对同一矩阵调用一次。原因是矩阵之后将始终更改(大约为其的1-20%)。
结果: 我实现了Kevin、Nikita和Daniel的算法,并在我的应用环境中对它们进行了基准测试,即没有孤立的合成基准测试,而是特别注意将所有算法集成到它们最有效的方式中,这对于Kevin的方法尤其重要,因为它使用泛型(请参见下文)。
首先是原始结果,使用Scala 2.8和jdk 1.6.0_23。作为解决特定应用程序问题的一部分,算法被执行了数百次。“Duration”表示需要的总时间,直到应用程序算法完成(当然不包括jvm启动等)。我的机器是一台2.8GHz Core 2 Duo,有2个核心和2gig内存,JVM分配了-Xmx800M。
重要提示:我认为我的基准测试设置对于像Daniel的并行化算法这样的算法并不公平。这是因为应用程序已经在计算多线程。因此,这里的结果可能仅显示等效于单线程速度。
矩阵大小233x587:
                  duration | JVM memory | avg CPU utilization
original O(n^4) | 3000s      30M          100%  
original/-server| 840s       270M         100%
Nikita O(n^2)   | 5-6s       34M          70-80%
Nikita/-server  | 1-2s       300M         100%
Kevin/-server   | 7400s      800M         96-98%
Kevin/-server** | 4900s      800M         96-99%
Daniel/-server  | 240s       360M         96-99%

使用@specialized,避免类型擦除,使泛型更快
矩阵大小为2000x3000:
                  duration | JVM memory | avg CPU utilization
original O(n^4) | too long   100M         100%  
Nikita O(n^2)   | 150s       760M         70%
Nikita/-server  | 295s (!)   780M         100%
Kevin/-server   | too long, didn't try

首先,有关内存的一点说明。-server JVM选项在更多优化和更快执行的优势下使用了相当多的内存。正如您从第二个表中可以看出,尼基塔的算法使用-server选项较慢,这显然是由于达到内存限制所致。我认为,即使对于小矩阵,这也会减慢凯文的算法,因为函数式方法本质上使用了更多的内存。为了消除内存因素,我还用50x50矩阵试过一次,然后凯文的需要5秒钟,而尼基塔的只需0秒钟(嗯,几乎为0)。无论如何,它仍然比较缓慢,而不仅仅是因为内存。

从数字上可以看出,显然我会使用尼基塔的算法,因为它非常快,而这在我的情况下绝对必要。正如丹尼尔指出的那样,它也很容易并行化。唯一的缺点是它不是真正的Scala方式。

目前,凯文的算法可能总体上有点过于复杂,因此速度比较慢,但我相信还有更多的优化可能性(请参见他答案中的最后评论)。

为了直接将尼基塔的算法转换为函数式风格,丹尼尔提出了一个解决方案,它已经相当快了,正如他在答案中所说,如果他可以使用scanRight,这个解决方案甚至会更快。

下一步是什么?

在技术方面:等待Scala 2.9,ScalaCL,并进行合成基准测试以获得原始速度。

我在所有这些中的目标是拥有功能性代码,但前提是不要牺牲太多速度。

答案选择:

至于选择答案,我想把尼基塔和丹尼尔的算法都标记为答案,但我必须选择一个。我的问题标题包括“最有效”,一个是命令式的最快,另一个是函数式风格的最快。尽管这个问题被标记为Scala,但我选择了尼基塔的命令式算法,因为2秒与240秒的差别对我来说仍然太大了。我相信差距仍然可以降低一点,您有什么想法吗?

非常非常感谢你们所有人!虽然我现在不会使用函数式算法,但我对Scala有了许多新的见解,我认为我慢慢地理解了所有的函数疯狂和它的潜力。(当然,即使没有做太多的函数式编程,Scala比Java更令人愉快……这就是学习它的另一个原因)


有一些算法可以找到区域(例如在画图程序中):泛洪填充 区域提取。但它们不会强制使用矩形模式。Kevin的答案对于这种用例看起来非常好。 - rds
@neo,只是出于好奇,你需要处理的矩阵有多大? - The Archetypal Paul
@Paul,我需要一个最有效的算法来处理大约300x300到3000x3000的范围。我对ScalaCL也很感兴趣,但不幸的是我的图形卡太旧了,无法支持它... - letmaik
我删掉了我的回答,因为我注意到算法中存在严重的缺陷。我会让Kevin来回答这个问题。 :-) - Daniel C. Sobral
@daniel,这必须是特化。在“pass”列中,这些方法被调用时使用了 T=(Int, Int) - Kevin Wright
显示剩余4条评论
3个回答

8

首先,几个辅助函数:

//count the number of elements matching the head
def runLength[T](xs:List[T]) = xs.takeWhile(_ == xs.head).size

//pair each element with the number of subsequent occurrences
def runLengths[T](row:List[T]) : List[(T,Int)] = row match {
  case Nil => Nil
  case h :: t => (h, runLength(row)) :: runLengths(t)
}
//should be optimised for tail-call, but easier to understand this way

//sample input: 1,1,2,2,2,3,4,4,4,4,5,5,6
//output: (1,2), (1,1), (2,3), (2,2), (2,1), (3,1), (4,4), (4,3), (4,2), (4,1), (5,2), (5,1), (6,1)

然后可以将其应用于网格中的每一行:

val grid = List(
  List(0,0,0,0),
  List(0,1,1,0),
  List(0,1,1,0),
  List(0,0,0,0))

val stage1 = grid map runLengths
// returns stage1: List[List[(Int, Int)]] =
// 0,4  0,3  0,2  0,1
// 0,1  1,2  1,1  0,1
// 0,1  1,2  1,1  0,1
// 0,4  0,3  0,2  0,1

然后,在完成水平方向的操作之后,我们现在对列执行完全相同的操作。这使用Scala标准集合库中可用的transpose方法来交换行<->列,就像数学矩阵运算一样。完成后,我们也会进行反转置。

val stage2 = (stage1.transpose map runLengths).transpose
// returns stage2: List[List[((Int, Int), Int)]] =
// (0,4),1  (0,3),1  (0,2),1  (0,1),4
// (0,1),2  (1,2),2  (1,1),2  (0,1),3
// (0,1),1  (1,2),1  (1,1),1  (0,1),2
// (0,4),1  (0,3),1  (0,2),1  (0,1),1

这是什么意思?以一个元素为例:(1,2),2,它表示该单元格包含值1,向右扫描,该行中有2个相邻的单元格包含值1。向下扫描,有两个相邻的单元格具有相同的属性,即包含值1并且具有相同数量的等值向右边。整理后更加清晰,将形式为((a,b),c)的嵌套元组转换为(a,(b,c)):
val stage3 = stage2 map { _.map {case ((a,b),c) => a->(b,c) } }
//returns stage3: List[List[(Int, (Int, Int))]] =
//  0,(4,1)  0,(3,1)  0,(2,1)  0,(1,4)
//  0,(1,2)  1,(2,2)  1,(1,2)  0,(1,3)
//  0,(1,1)  1,(2,1)  1,(1,1)  0,(1,2)
//  0,(4,1)  0,(3,1)  0,(2,1)  0,(1,1)

“1,(2,2)”指的是一个包含值为“1”的单元格,并位于一个2x2的相同值单元格矩形的左上角。

从这里开始,很容易发现相同大小的矩形。但是如果您还想排除作为大型矩形子集的区域,则需要更多的工作。

更新:如所写,该算法对于像(0,0)处的单元格并不适用,该单元格属于两个不同的矩形(1x4和4x1)。如果需要,也可以使用相同的技术解决此问题。(使用map/transpose/map/transpose进行一次传递,然后使用transpose/map/transpose/map进行第二次传递,最后将结果压缩并展平)

如果输入可能包含包含相同值单元格的相邻矩形,例如:

0 0 0 0 0 0 0 0
0 0 1 1 1 0 0 0
0 0 1 1 1 0 0 0
0 0 1 1 1 1 1 0
0 0 1 1 1 1 1 0
0 0 1 1 1 1 1 0
0 0 0 0 0 0 0 0

将所有内容整合在一起,并稍加整理:

将所有内容整合在一起,并稍加整理:

type Grid[T] = List[List[T]]

def runLengths[T](row:List[T]) : List[(T,Int)] = row match {
  case Nil => Nil
  case h :: t => (h -> row.takeWhile(_ == h).size) :: runLengths(t)
}

def findRectangles[T](grid: Grid[T]) = {
  val step1 = (grid map runLengths)
  val step2 = (step1.transpose map runLengths).transpose
  step2 map { _ map { case ((a,b),c) => (a,(b,c)) } }
}

更新2

请戴好帽子,这是一个大的更新...

在写入新功能代码之前,我们首先对一些方法进行重构,将它们变成 Ops 类型,并添加隐式类型转换,以便它们可以像底层集合类型的定义方法一样使用:

import annotation.tailrec

class RowOps[T](row: List[T]) {
  def withRunLengths[U](func: (T,Int)=>U) : List[U] = {
    @tailrec def recurse(row:List[T], acc:List[U]): List[U] = row match {
      case Nil => acc
      case head :: tail =>
        recurse(
          tail,
          func(head, row.takeWhile(head==).size) :: acc)
    }
    recurse(row, Nil).reverse
  }

  def mapRange(start: Int, len: Int)(func: T=>T) =
    row.splitAt(start) match {
      case (l,r) => l ::: r.take(len).map(func) ::: r.drop(len)
    }
}

implicit def rowToOps[T](row: List[T]) = new RowOps(row)

这将为列表添加一个withRunLengths方法。 这里的一个显著区别是,该方法不返回(value, length)对的列表,而是接受一个函数作为参数,用于创建每个这样的对应输出值。 这将在以后很方便...

type Grid[T] = List[List[T]]

class GridOps[T](grid: Grid[T]) {
  def deepZip[U](other: Grid[U]) = (grid zip other) map { case (g,o) => g zip o}
  def deepMap[U](f: (T)=>U) = grid map { _ map f}
  def mapCols[U](f: List[T]=>List[U]) = (grid.transpose map f).transpose
  def height = grid.size
  def width = grid.head.size
  def coords = List.tabulate(height,width){ case (y,x) => (x,y) }
  def zipWithCoords = deepZip(coords)
  def deepMapRange(x: Int, y: Int, w: Int, h: Int)(func: T=>T) =
    grid mapRange (y,h){ _.mapRange(x,w)(func) }
}

implicit def gridToOps[T](grid: Grid[T]) = new GridOps(grid)

这里不应该有任何意外。 deepXXX 方法避免编写形式为list map { _ map { ... } }的结构体。 tabulate可能对你来说也是新的,但它的用途希望从使用中变得明显。
使用这些方法,定义查找整个网格上水平和垂直运行长度的函数变得非常简单:
def withRowRunLengths[T,U](grid: Grid[T])(func: (T,Int)=>U) =
  grid map { _.withRunLengths(func) }

def withColRunLengths[T,U](grid: Grid[T])(func: (T,Int)=>U) =
  grid mapCols { _.withRunLengths(func) }

为什么要使用2个参数块而不是一个?我会简单解释一下。

我本可以将它们定义为GridOps中的方法,但它们似乎不适用于通用目的。如果您有不同意见,请随意提出 :)

接下来,定义一些输入...

def parseIntGrid(rows: String*): Grid[Int] =
  rows.toList map { _ map {_.toString.toInt} toList }

val input: Grid[Int] = parseIntGrid("0000","0110","0110","0000")

...另一种有用的助手类型...

case class Rect(w: Int, h: Int)
object Rect { def empty = Rect(0,0) }

作为元组的替代品,这真的有助于调试。深度嵌套的括号对眼睛不友好。(抱歉Lisp的粉丝!)
...并使用上面定义的函数:
val stage1w = withRowRunLengths(input) {
  case (cell,width) => (cell,width)
}

val stage2w = withColRunLengths(stage1w) {
  case ((cell,width),height) => Rect(width,height)
}


val stage1t = withColRunLengths(input) {
 case (cell,height) => (cell,height)
}

val stage2t = withRowRunLengths(stage1t) {
  case ((cell,height),width) => Rect(width,height)
}

所有上述块都应该是一行代码,但我为了在StackOverflow上重新格式化。此阶段的输出只是矩形的网格,我有意省略了矩形实际组成的值的任何提及。这完全没问题,很容易从它在网格中的坐标找到它的值,并且我们将在很短的时间内重新组合数据。还记得我解释过RowOps#withRunLengths需要一个函数作为参数吗?这就是它被使用的地方。case (cell,width) => (cell,width)实际上是一个函数,它接受单元格值和运行长度(称其为cellwidth),然后返回(cell,width)元组。这也是我在定义函数时使用了两个参数块的原因,所以第二个参数可以通过{大括号}传递,使整个过程看起来像DSL。这里还有一个非常重要的原则,即类型推断器按顺序对参数块进行处理,因此对于这个(还记得吗?):
def withRowRunLengths[T,U](grid: Grid[T])(func: (T,Int)=>U)

类型T将由提供的网格确定。编译器因此知道作为第二个参数提供的函数的输入类型 - 在这种情况下是Int,因为第一个参数是Grid[Int],这就是为什么我能够编写case (cell,width) => (cell,width)而不必在任何地方明确说明cellwidth是整数。在第二种用法中,提供了一个Grid[(Int,Int)],这适用于闭包case ((cell,width),height) => Rect(width,height),同样可以正常工作。

如果该闭包包含任何对于网格底层类型无效的内容,则编译器会发出警告,这就是类型安全的全部意义所在...

计算出所有可能的矩形后,剩下的就是收集数据并以更实用的格式呈现它们。因为此阶段的嵌套可能会变得非常混乱,所以我定义了另一种数据类型:

case class Cell[T](
  value: T,
  coords: (Int,Int) = (0,0),
  widest: Rect = Rect.empty,
  tallest: Rect = Rect.empty
)

这里没有什么特别的,只是一个带有命名/默认参数的case类。我也很高兴我有先见之明地在上面定义了Rect.empty :)

现在将4个数据集(输入值、坐标、最宽矩形、最高矩形)混合在一起,逐渐折叠到单元格中,轻轻搅拌,即可食用:

val cellsWithCoords = input.zipWithCoords deepMap {
  case (v,(x,y)) => Cell(value=v, coords=(x,y))
}

val cellsWithWidest = cellsWithCoords deepZip stage2w deepMap {
  case (cell,rect) => cell.copy(widest=rect)
}

val cellsWithWidestAndTallest = cellsWithWidest deepZip stage2t deepMap {
  case (cell,rect) => cell.copy(tallest=rect)
}

val results = (cellsWithWidestAndTallest deepMap {
  case Cell(value, coords, widest, tallest) =>
    List((widest, value, coords), (tallest, value, coords))
  }
).flatten.flatten

最后一个阶段很有趣,它将每个单元格转换为大小为2的列表,其中包含(矩形、值、坐标)元组 (大小为2,因为我们对于每个单元格都有最宽和最高的矩形)。然后调用两次“flatten”函数,将结果从List[List[List[_]]] 转换为一个单一的List。现在不需要再保留2D结构,因为必要的坐标已经嵌入到结果中。
大功告成!
现在你可以自由处理这个List了。下一步可能是某种形式的排序和去重...

嗯...不知道是不是Scala插件的问题,但它不让我编译,因为出现了这个错误:"too many arguments for method map2dRowFirst: (grid: HelpersTest.this.Grid[T])(func: (List[T]) => List[T])List[List[T]]"。我还没有理解map2dRowFirst中第二个参数列表的作用,我的意思是一般情况下,而不仅仅是在你的代码中。你能给我解释一下吗? - letmaik
@Kevin现在它抱怨说“类型不匹配;找到:(List [Nothing]) => List [(Nothing,Int)]需要:(List [Any]) => List [Any]”对于最后一行中的runLengths参数,这似乎是正确的,因为List [(T,Int)]不是List [T]。 - letmaik
@neo 现在全部修复了。可惜我把我的 REPL 会话留在了另一台电脑上,我想回去检查一下我漏掉了什么。 - Kevin Wright
@Kevin,我不知道“-server”标志,似乎真的可以加速所有算法(我的原始算法,Nikita的算法和你的算法)。唯一的“缺点”是内存消耗可能会成为我的2GB内存工作站的问题,但我们会看看的。 - letmaik
@Kevin 谢谢! :) 我正在集成它,我认为你在 "grid mapRange(y,h)...." 之间的 deepMapRange 中漏掉了一个点,至少 Eclipse 在没有点的情况下会抱怨,但不知道为什么中缀符号在这里不起作用,也许是因为柯里化? - letmaik
显示剩余18条评论

5
你可以相对容易地使用 O(n^2) 实现它。 首先,进行一些预处理。对于矩阵中的每个单元格,计算下方有多少连续的单元格具有相同的数字。 对于你的示例,结果矩阵a(想不出更好的名称:/)看起来像这样。
0 0 0 0 0 2 2
1 1 2 2 2 1 1
0 0 1 1 1 0 0
1 1 0 0 0 1 1
0 0 0 0 0 0 0

这个可以轻松地在O(n^2)时间内生成。

现在,对于每一行i,让我们找到所有顶部在第i行(底部在第i + height - 1行)的矩形。
以下是当i = 1时的示例:

0 0 0 0 0 0 0
-------------
4 4 2 2 2 0 0
4 4 2 2 2 0 0
0 0 2 2 2 1 1
-------------
0 0 0 0 0 1 1

现在,主要思想是:
int current_width = 0;
for (int j = 0; j < matrix.width; ++j) {
    if (a[i][j] < height - 1) {
        // this column has different numbers in it, no game
        current_width = 0;
        continue;
    }

    if (current_width > 0) {
        // this column should consist of the same numbers as the one before
        if (matrix[i][j] != matrix[i][j - 1]) {
            current_width = 1; // start streak anew, from the current column
            continue;
        }
    }

    ++current_width;
    if (current_width >= width) {
        // we've found a rectangle!
    }
}

在上面的例子中(i = 1),每次迭代后的current_width将是0, 0, 1, 2, 3, 0, 0
现在,我们需要遍历所有可能的i,并且我们已经有了一个解决方案。

2
啊...Java!不干净,不干净! - Kevin Wright
2
“Big-O本质上是串行复杂度的度量”,这是不正确的。它是所需操作数量的度量。在固定核心数的情况下,O(n^4)算法仍然是O(n^4)。而且,除非核心数大于或等于n,否则不可变的声明性解决方案无法同时处理每一行,除非我漏掉了什么。 - The Archetypal Paul
2
@Kevin,我不是在谈论计数,而是预测未来。由于普通消费笔记本电脑只有2-4个内核(这个数字最近似乎并没有遵守摩尔定律),因此在10^6次操作和10^12次操作之间的差异不重要的时代不会很快到来。我简直不敢相信自己还要为此争论。(更不用说,在Java或C中,根据我上面提到的,绝对不存在防止并行化的东西。) - Nikita Rybak
2
@Nikita 当问题同时被标记为“scala”和“scala 2.8”时,你会认为它在这里被讨论...我建议你回去阅读我的帖子,在那里我确实描述了它是如何工作的,如果你连它是什么都不知道,你怎么能诋毁它为O(n^4)呢? - Kevin Wright
2
@Nikita 哈哈....我刚刚做了一个更大的基准测试,当我在我的应用程序中使用你的代码(多达几百次)时,总共只需要6-7秒,而我的原始解决方案需要3065秒:D 所以你的代码快了400-500倍...这真的可能吗?当然,这些测量结果并不科学,但仍然可以给人一个想法。 - letmaik
显示剩余24条评论

5

我将扮演恶魔的代言人。我会展示Nikita用函数式风格编写的确切算法。我还会并行化它,只是为了表明它是可行的。

首先,计算一个单元格下面具有相同数字的连续单元格。与Nikita提议的输出相比,我进行了轻微更改,返回所有值加一,以避免在代码的其他部分出现-1

def computeHeights(column: Array[Int]) = (
    column
    .reverse
    .sliding(2)
    .map(pair => pair(0) == pair(1))
    .foldLeft(List(1)) ( 
        (list, flag) => (if (flag) list.head + 1 else 1) :: list
    )
)

我更愿意在反转之前使用.view,但这在当前的Scala版本中不起作用。如果可以的话,它将节省重复的数组创建,这应该会因内存局部性和带宽原因而大大加快代码的速度,如果没有其他原因的话。
现在,同时处理所有列:
import scala.actors.Futures.future

def getGridHeights(grid: Array[Array[Int]]) = (
    grid
    .transpose
    .map(column => future(computeHeights(column)))
    .map(_())
    .toList
    .transpose
)

我不确定并行化开销是否会在这里得到回报,但这是 Stack Overflow 上第一个算法,它实际上有机会成功,因为计算列需要非常大的工作量。以下是另一种编写它的方式,使用即将推出的 2.9 版本功能(它可能适用于 Scala 2.8.1——不确定):

def getGridHeights(grid: Array[Array[Int]]) = (
    grid
    .transpose
    .toParSeq
    .map(computeHeights)
    .toList
    .transpose
)

现在,Nikita算法的核心部分:
def computeWidths(height: Int, row: Array[Int], heightRow: List[Int]) = (
    row
    .sliding(2)
    .zip(heightRow.iterator)
    .toSeq
    .reverse
    .foldLeft(List(1)) { case (widths @ (currWidth :: _), (Array(prev, curr), currHeight)) =>
        (
            if (currHeight >= height && currWidth > 0 && prev == curr) currWidth + 1
            else 1
        ) :: widths
    }
    .toArray
)

在这段代码中,我正在使用模式匹配,但我担心它的速度,因为在所有滑动、压缩和折叠中,有太多的东西在这里被搅动。而且,说到性能,我正在使用Array而不是IndexedSeq,因为Array是JVM中唯一没有被擦除的类型,使用Int时性能更好。然后,还有.toSeq,我对它也不是很满意,因为它会影响内存局部性和带宽。

此外,我是从右到左进行操作,而不是Nikita的从左到右,只是为了找到左上角。

然而,除了我仍然将当前宽度加1与他的代码相比,而且没有在此处打印结果之外,这与Nikita的答案中的代码是相同的。这里有很多没有明确起源的东西--rowheightRowheight...让我们看看这个代码的上下文--并行化!--来获得整体图片。

def getGridWidths(height: Int, grid: Array[Array[Int]]) = (
    grid
    .zip(getGridHeights(grid))
    .map { case (row, heightsRow) => future(computeWidths(height, row, heightsRow)) }
    .map(_())
)

而且是2.9版本:

def getGridWidths(height: Int, grid: Array[Array[Int]]) = (
    grid
    .toParSeq
    .zip(getGridHeights(grid))
    .map { case (row, heightsRow) => computeWidths(height, row, heightsRow) }
)

而且,为了最后的高潮,

def findRectangles(height: Int, width: Int, grid: Array[Array[Int]]) = {
    val gridWidths = getGridWidths(height, grid)
    for {
        y <- gridWidths.indices
        x <- gridWidths(y).indices
        if gridWidths(y)(x) >= width
    } yield (x, y)
}

所以...我毫不怀疑,Nikita算法的命令式版本更快——它只使用Array,这对于原始类型比其他任何类型都要快得多,并且它避免了大量创建临时集合——尽管Scala在这里可能做得更好。此外,没有闭包——尽管它们很有帮助,但它们比没有闭包的代码慢。至少在JVM增长一些帮助它们之前是这样的。
另外,Nikita的代码可以轻松地使用线程并行化——所有的事情都可以!而且几乎没有麻烦。
但我的观点是,Nikita的代码并不特别糟糕,只是因为它在这里和那里使用数组和可变变量。该算法可以干净地转换成更具功能性的风格。
编辑
所以,我决定尝试制作一个有效的函数版本。它不是完全的函数,因为我使用了Iterator,它是可变的,但足够接近。不幸的是,它在Scala 2.8.1上无法工作,因为它缺少scanLeftIterator上的实现。
这里还有两个不幸的事情。首先,我放弃了网格高度的并行化,因为我无法避免至少需要一个transpose,带来的所有集合复制问题。尽管如此,仍然至少需要复制一次数据(请参见toArray调用以了解在哪里)。
另外,由于我正在使用Iterable,我失去了使用并行集合的能力。我确实想知道,如果从一开始就让grid成为并行集合的并行集合,代码是否可以改进。
我不知道这是否比以前的版本更有效。这是一个有趣的问题...
def getGridHeights(grid: Array[Array[Int]]) = (
    grid
    .sliding(2)
    .scanLeft(Array.fill(grid.head.size)(1)) { case (currHeightArray, Array(prevRow, nextRow)) =>
        (prevRow, nextRow, currHeightArray)
        .zipped
        .map { case (x, y, currHeight) =>  if (x == y) currHeight + 1 else 1 }
    }
)

def computeWidths(height: Int, row: Array[Int], heightRow: Array[Int]) = (
    row
    .sliding(2)
    .map { case Array(x, y) => x == y }
    .zip(heightRow.iterator)
    .scanLeft(1) { case (currWidth , (isConsecutive, currHeight)) =>
        if (currHeight >= height && currWidth > 0 && isConsecutive) currWidth + 1
        else 1
    }
    .toArray
)

import scala.actors.Futures.future

def getGridWidths(height: Int, grid: Array[Array[Int]]) = (
    grid
    .iterator
    .zip(getGridHeights(grid))
    .map { case (row, heightsRow) => future(computeWidths(height, row, heightsRow)) }
    .map(_())
    .toArray
)

def findRectangles(height: Int, width: Int, grid: Array[Array[Int]]) = {
    val gridWidths = getGridWidths(height, grid)
    for {
        y <- gridWidths.indices
        x <- gridWidths(y).indices
        if gridWidths(y)(x) >= width
    } yield (x - width + 1, y - height + 1)
}

谢谢!我一直知道在Scala中可以实现不错的算法 :) 我有空的时候会更深入地研究它,也许我甚至会开始理解一些Scala。 - Nikita Rybak
顺便说一下,我这里主要是以函数式风格的清晰代码为主,而不是追求速度。我有点受挫,因为Scala库中有两个问题(其中一个可以说是bug),这也会使代码更快。然而,没有什么能比使用Nikita展示的算法实现while循环更快了。你可能会从并行化中获益,但在Nikita的代码中只需要很少的改动就可以实现相同的效果。 - Daniel C. Sobral
@Daniel,我也喜欢尽可能清晰的代码,这就是为什么我对像你这样快速的功能性解决方案也很感兴趣。我会在一秒钟内为您的代码包括基准测试结果。 - letmaik
我指的不是清晰的代码(因为Nikita的代码也很清晰),而是函数式风格的代码。 - letmaik
@Kevin,我的意图是在“Iterable”上使用“scanRight”。不幸的是,“Iterable”上没有“scanRight”,而且“scanRight”也没有正确实现。 - Daniel C. Sobral
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接