在Spark中广播烦扰对象(用于最近邻)?

7

由于Spark的mllib没有最近邻功能,因此我正在尝试使用Annoy进行近似最近邻。我尝试广播Annoy对象并将其传递给工作节点,但它没有按预期运行。

下面是用于复现问题的代码(在PySpark中运行)。问题在于使用Annoy时与不使用Spark时看到的差异。

from annoy import AnnoyIndex
import random
random.seed(42)

f = 40
t = AnnoyIndex(f)  # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
    v = [random.gauss(0, 1) for z in xrange(f)]
    t.add_item(i, v)
    allvectors.append((i, v))
t.build(10) # 10 trees

# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()

# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))

输出结果如下:

使用Spark得到第一个向量的五个最近邻居:无

不使用Spark得到第一个向量的五个最近邻居:[0, 13, 12, 6, 4]

2个回答

10
我从未使用过Annoy,但我很确定包描述解释了这里正在发生的事情:
它还创建大型只读基于文件的数据结构,这些数据结构被映射到内存中,以便许多进程可以共享相同的数据。
由于它在使用内存映射索引时,当您将其序列化并传递给工作进程时,所有数据都会丢失。
可以尝试像这样做:
from pyspark import SparkFiles

t.save("index.ann")
sc.addPyFile("index.ann")

def find_neighbors(iter):
    t = AnnoyIndex(f)
    t.load(SparkFiles.get("index.ann"))
    return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)

sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]

这个树会被加载到驱动程序内存还是执行器内存中?我正在尝试在Spark上使用annoy tree。树的大小为3GB(实际上是将树保存在HDFS上)。当我尝试查找最近的邻居时,我只需加载树并在每个查询的循环中获取结果。 - sau
嗨Sau,树的大小很大,我不确定你是否成功使用了上面的代码? - user48135
如何在Spark数据框[ID:int,features:vectors]上创建AnnoyIndex并构建模型? - abdkumar

5

以防其他人像我一样在这里跟进,你需要在mapPartitions函数中导入Annoy,否则你仍然会遇到pickling错误。这是基于以上内容的完整示例:

from annoy import AnnoyIndex

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark import SparkConf

import random
random.seed(42)

f = 1024
t = AnnoyIndex(f)
allvectors = []
for i in range(100):
    v = [random.gauss(0, 1) for z in range(f)]
    t.add_item(i, v)
    allvectors.append((i, v))

t.build(10)
t.save("index.ann")

def find_neighbors(i):
    from annoy import AnnoyIndex
    ai = AnnoyIndex(f)
    ai.load(SparkFiles.get("index.ann"))
    return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i)

with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc:
  sc.addFile("index.ann")
  sparkvectors = sc.parallelize(allvectors)
  sparkvectors.mapPartitions(find_neighbors).first()

1
你能解释一下它为什么会这样吗?所有的导入都需要在被mapPartitions调用的函数内部吗? - Monit Gehlot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接