如何加速Levenshtein距离的计算

11

我正在尝试运行一项模拟,以测试随机二进制字符串之间的平均Levenshtein距离

我的程序是用Python编写的,但我正在使用这个C扩展。最相关且占用大部分时间的函数计算两个字符串之间的Levenshtein距离,代码如下。

lev_edit_distance(size_t len1, const lev_byte *string1,
                  size_t len2, const lev_byte *string2,
                  int xcost)
{
  size_t i;
  size_t *row;  /* we only need to keep one row of costs */
  size_t *end;
  size_t half;

  /* strip common prefix */
  while (len1 > 0 && len2 > 0 && *string1 == *string2) {
    len1--;
    len2--;
    string1++;
    string2++;
  }

  /* strip common suffix */
  while (len1 > 0 && len2 > 0 && string1[len1-1] == string2[len2-1]) {
    len1--;
    len2--;
  }

  /* catch trivial cases */
  if (len1 == 0)
    return len2;
  if (len2 == 0)
    return len1;

  /* make the inner cycle (i.e. string2) the longer one */
  if (len1 > len2) {
    size_t nx = len1;
    const lev_byte *sx = string1;
    len1 = len2;
    len2 = nx;
    string1 = string2;
    string2 = sx;
  }
  /* check len1 == 1 separately */
  if (len1 == 1) {
    if (xcost)
      return len2 + 1 - 2*(memchr(string2, *string1, len2) != NULL);
    else
      return len2 - (memchr(string2, *string1, len2) != NULL);
  }
  len1++;
  len2++;
  half = len1 >> 1;
  /* initalize first row */
  row = (size_t*)malloc(len2*sizeof(size_t));
  if (!row)
    return (size_t)(-1);
  end = row + len2 - 1;
  for (i = 0; i < len2 - (xcost ? 0 : half); i++)
    row[i] = i;

  /* go through the matrix and compute the costs.  yes, this is an extremely
   * obfuscated version, but also extremely memory-conservative and relatively
   * fast.  */
  if (xcost) {
    for (i = 1; i < len1; i++) {
      size_t *p = row + 1;
      const lev_byte char1 = string1[i - 1];
      const lev_byte *char2p = string2;
      size_t D = i;
      size_t x = i;
      while (p <= end) {
        if (char1 == *(char2p++))
          x = --D;
        else
          x++;
        D = *p;
        D++;
        if (x > D)
          x = D;
        *(p++) = x;
      }
    }
  }
  else {
    /* in this case we don't have to scan two corner triangles (of size len1/2)
     * in the matrix because no best path can go throught them. note this
     * breaks when len1 == len2 == 2 so the memchr() special case above is
     * necessary */
    row[0] = len1 - half - 1;
    for (i = 1; i < len1; i++) {
      size_t *p;
      const lev_byte char1 = string1[i - 1];
      const lev_byte *char2p;
      size_t D, x;
      /* skip the upper triangle */
      if (i >= len1 - half) {
        size_t offset = i - (len1 - half);
        size_t c3;

        char2p = string2 + offset;
        p = row + offset;
        c3 = *(p++) + (char1 != *(char2p++));
        x = *p;
        x++;
        D = x;
        if (x > c3)
          x = c3;
        *(p++) = x;
      }
      else {
        p = row + 1;
        char2p = string2;
        D = x = i;
      }
      /* skip the lower triangle */
      if (i <= half + 1)
        end = row + len2 + i - half - 2;
      /* main */
      while (p <= end) {
        size_t c3 = --D + (char1 != *(char2p++));
        x++;
        if (x > c3)
          x = c3;
        D = *p;
        D++;
        if (x > D)
          x = D;
        *(p++) = x;
      }
      /* lower triangle sentinel */
      if (i <= half) {
        size_t c3 = --D + (char1 != *char2p);
        x++;
        if (x > c3)
          x = c3;
        *p = x;
      }
    }
  }

  i = *end;
  free(row);
  return i;
}

这个能加速吗?

我将在一台搭载AMD FX(tm)-8350八核处理器的32位Ubuntu上运行代码。

以下是调用它的Python代码:

from Levenshtein import distance
import random
for i in xrange(16):
    sum = 0
    for j in xrange(1000):
        str1 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
        str2 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
        sum += distance(str1,str2)
    print i,sum/(1000*2**i)

1
你是否对整个程序以及该函数进行了性能分析,以确定 CPU 周期的消耗位置?如果没有,那么你只是在猜测。 - sizzzzlerz
@packersfan16,我真的非常怀疑for循环是瓶颈。它只被执行了16000次,这可能比i = 15迭代中的单个编辑距离计算还要小。 - Lucas Wiman
@technosaurus 看起来很有趣。我对使用哪个编译器很灵活。我目前正在使用gcc 4.7.2。 - marshall
如果您要为16个线程(每个有2个线程的8个核心)展开bitops,那么每个周期将是1024位或128个字符。如果需要示例,请参见任何libc或内核的字符串部分...尽管Linux内核在大多数架构上使用汇编语言,但该代码是编写成可嵌入C的。 - technosaurus
@marshall:不好意思,我一直想自己实现这个功能,但从未有机会去做。 - Fred Foo
显示剩余15条评论
4个回答

3
你可以尝试并行运行。在开始时生成一个大的随机数列表,在循环中,每次启动 8 个线程来处理列表的一个块,并将其最终结果添加到总和变量中。或者一次生成 8 个列表,再每次同时处理 8 个列表。
OpenMP 建议的问题在于“由于有大量的数据依赖关系,这个算法不易并行化” - 维基百科。
from threading import Thread

sum = 0

def calc_distance(offset) :
    sum += distance(randoms[offset][0], randoms[offset][1]) #use whatever addressing scheme is best

threads = []
for i in xrange(8) :
    t = new Thread(target=calc_distance, args=(i))
    t.start()
    threads.append(t)

later....

for t in threads :
     t.join()

如果能够有Levenshtein距离内核(或可编码),我认为这种方法后期将很好地移植到OpenCL中。

这只是一个快速的回忆帖子,所以可能还有一些需要解决的问题。


谢谢。我听说多进程是Python并行处理的推荐模块。线程有什么优势吗? - marshall
多进程使用进程而不是线程。由于Python中的全局解释器锁定,许多人更喜欢使用多进程而不是线程化代码。您可以尝试两种方法并进行比较,它们都使用“Process”类具有类似的语法。在这里,多进程可能实际上具有轻微的优势 http://docs.python.org/2/library/multiprocessing.html。 - beiller

1
你可以从这个网站学习一些OpenMP的概念和指令:OpenMP初学者指南
你需要一个兼容OpenMP的编译器。这里有一个列表,列出了兼容的编译器。编译代码时,你需要使用-fopenmp选项。
我只添加了编译器指令#pragma omp parallel for到你的代码中,告诉编译器以下代码块可以并行运行。通过将while循环改为for循环或在整个函数中应用OpenMP模式,您可以看到更多的性能提升。在这些代码块之前,您可以使用函数omp_set_num_threads()调整要用于执行for循环的线程数来调整性能。由于您将在8核处理器上运行,因此可以从8开始尝试。
lev_edit_distance(size_t len1, const lev_byte *string1,
              size_t len2, const lev_byte *string2,
              int xcost)
{
  size_t i;
  size_t *row;  /* we only need to keep one row of costs */
  size_t *end;
  size_t half;

 // Set the number of threads the OpenMP framework will use to parallelize the for loops
 omp_set_num_threads(8);

  /* strip common prefix */
  while (len1 > 0 && len2 > 0 && *string1 == *string2) {
    len1--;
    len2--;
    string1++;
    string2++;
  }

  /* strip common suffix */
  while (len1 > 0 && len2 > 0 && string1[len1-1] == string2[len2-1]) {
    len1--;
    len2--;
  }

  /* catch trivial cases */
  if (len1 == 0)
    return len2;
  if (len2 == 0)
    return len1;

  /* make the inner cycle (i.e. string2) the longer one */
  if (len1 > len2) {
    size_t nx = len1;
    const lev_byte *sx = string1;
    len1 = len2;
    len2 = nx;
    string1 = string2;
    string2 = sx;
  }
  /* check len1 == 1 separately */
  if (len1 == 1) {
    if (xcost)
      return len2 + 1 - 2*(memchr(string2, *string1, len2) != NULL);
    else
      return len2 - (memchr(string2, *string1, len2) != NULL);
  }
  len1++;
  len2++;
  half = len1 >> 1;
  /* initalize first row */
  row = (size_t*)malloc(len2*sizeof(size_t));
  if (!row)
    return (size_t)(-1);
  end = row + len2 - 1;

  #pragma omp parallel for
  for (i = 0; i < len2 - (xcost ? 0 : half); i++)
    row[i] = i;

  /* go through the matrix and compute the costs.  yes, this is an extremely
   * obfuscated version, but also extremely memory-conservative and relatively
   * fast.  */
  if (xcost) {
   #pragma omp parallel for
   for (i = 1; i < len1; i++) {
      size_t *p = row + 1;
      const lev_byte char1 = string1[i - 1];
      const lev_byte *char2p = string2;
      size_t D = i;
      size_t x = i;
      while (p <= end) {
        if (char1 == *(char2p++))
          x = --D;
        else
          x++;
        D = *p;
        D++;
        if (x > D)
          x = D;
        *(p++) = x;
      }
    }
  }
  else {
    /* in this case we don't have to scan two corner triangles (of size len1/2)
     * in the matrix because no best path can go throught them. note this
     * breaks when len1 == len2 == 2 so the memchr() special case above is
     * necessary */
    row[0] = len1 - half - 1;
    #pragma omp parallel for
    for (i = 1; i < len1; i++) {
      size_t *p;
      const lev_byte char1 = string1[i - 1];
      const lev_byte *char2p;
      size_t D, x;
      /* skip the upper triangle */
      if (i >= len1 - half) {
        size_t offset = i - (len1 - half);
        size_t c3;

        char2p = string2 + offset;
        p = row + offset;
        c3 = *(p++) + (char1 != *(char2p++));
        x = *p;
        x++;
        D = x;
        if (x > c3)
          x = c3;
        *(p++) = x;
      }
      else {
        p = row + 1;
        char2p = string2;
        D = x = i;
      }
      /* skip the lower triangle */
      if (i <= half + 1)
        end = row + len2 + i - half - 2;
      /* main */
      while (p <= end) {
        size_t c3 = --D + (char1 != *(char2p++));
        x++;
        if (x > c3)
          x = c3;
        D = *p;
        D++;
        if (x > D)
          x = D;
        *(p++) = x;
      }
      /* lower triangle sentinel */
       if (i <= half) {
        size_t c3 = --D + (char1 != *char2p);
        x++;
        if (x > c3)
          x = c3;
        *p = x;
      }
    }
  }

  i = *end;
  free(row);
  return i;
}

您还可以对在for循环中进行操作的变量执行reduction操作,以提供简单的并行计算,如求和、乘法等。

int main()
{
    int i = 0,
        j = 0,
        sum = 0;
    char str1[30]; // Change size to fit your specifications
    char str2[30];

    #pragma omp parallel for
    for(i=0;i<16;i++)
    {
        sum = 0;
            // Could do a reduction on sum across all threads
        for(j=0;j<1000;j++)
        {
            // Calls will have to be changed
            // I don't know much Python so I'll leave that to the experts 
            str1 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
            str2 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
            sum += distance(str1,str2)
        }
        printf("%d %d",i,(sum/(1000*2*i)));
    }
}

谢谢。我将在AMD FX(tm)-8350八核处理器上的32位Ubuntu上运行代码。编译器是gcc版本4.7.2。 - marshall
我应该说一下,Python代码只是将str1和str2设置为长度为2 ** i的随机二进制字符串。 - marshall

1

我的建议:

1)进行非常小的优化:为了避免内存管理开销,一次性分配row。或者您可以尝试使用realloc(),或者您可以在静态变量中跟踪row的大小(并使row也成为静态)。然而,这只能节省很少的空间,即使实施起来成本很低。

2)您正在尝试计算平均值。同样,在C语言中进行平均值计算。这应该在调用时节省一些时间。再次强调,这是一个小改变,但它很便宜。

3)由于您不关心实际计算,只关心结果,因此假设您有三台PC,每台PC都是四核机器。然后在每台PC上运行程序的四个实例,并且循环十二次较短。您将在十二分之一的时间内获得十二个结果:对这些结果取平均值即可。

选项#3除了循环之外不需要任何修改,您可能希望将其作为命令行参数,以便在可变数量的计算机上部署程序。实际上,您可能希望输出结果及其“权重”,以最小化在将结果相加时出错的机会。
for j in xrange(N):
    str1 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
    str2 = bin(random.getrandbits(2**i))[2:].zfill(2**i)
    sum += distance(str1,str2)
print N,i,sum/(N*2**i)

但是,如果您对通用的Levenshtein统计感兴趣,我不确定仅使用0和1符号进行计算是否适合您的目的。从字符串01010101中,您可以通过翻转八个字符或删除第一个并在末尾添加零来获得10101010,这两种情况具有不同的成本。如果您拥有字母表中的所有字母,则第二种可能性变得不太可能,并且这应该会改变平均成本方案中的某些内容。还是我漏掉了什么?


谢谢。目前我只对二进制字符串感兴趣。 - marshall

0

一两年前,有人进行了大量的研究并进行了运行时测试。

他提出了this,基本上使用了解决方案树来加速处理。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接