将Spark RDD序列化并读取到Python中

5
我正在尝试通过将Spark RDD进行pickle序列化,并直接将序列化文件读入Python来实现目标。
a = sc.parallelize(['1','2','3','4','5'])
a.saveAsPickleFile('test_pkl')

接下来,我将test_pkl文件复制到本地,但我该如何直接在Python中读取它们呢?当我试着使用普通的pickle包时,尝试读取'test_pkl'的第一个pickle部分失败了:

pickle.load(open('part-00000','rb'))

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib64/python2.6/pickle.py", line 1370, in load
    return Unpickler(file).load()
  File "/usr/lib64/python2.6/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib64/python2.6/pickle.py", line 970, in load_string
    raise ValueError, "insecure string pickle"
ValueError: insecure string pickle

我认为Spark使用的腌制方法与Python的pickle方法不同(如果我错了请纠正)。是否有办法将Spark中的数据进行腌制,并将这个腌制对象直接从文件中读取到Python中?


1
问题在于它不是一个pickle文件,而是包含了被pickled的对象的SequenceFile,我不知道有没有Python中正在积极开发的SequenceFiles解析器。 - zero323
3个回答

3
可以使用sparkpickle项目来实现。非常简单。
with open("/path/to/file", "rb") as f:
    print(sparkpickle.load(f))

1
更好的方法可能是将每个分区中的数据进行pickle处理,编码后写入文本文件中:
import cPickle
import base64

def partition_to_encoded_pickle_object(partition):
    p = [i for i in partition] # convert the RDD partition to a list
    p = cPickle.dumps(p, protocol=2) # pickle the list
    return [base64.b64encode(p)] # base64 encode the list, and return it in an iterable

my_rdd.mapPartitions(partition_to_encoded_pickle_object).saveAsTextFile("your/hdfs/path/")

当您将文件下载到本地目录后,可以使用以下代码段来读取它:

# you first need to download the file, this step is not shown
# afterwards, you can use 
path = "your/local/path/to/downloaded/files/"
data = []
for part in os.listdir(path):
    if part[0] != "_": # this prevents system generated files from getting read - e.g. "_SUCCESS"
        data += cPickle.loads(base64.b64decode((open(part,'rb').read())))

这里唯一的问题是加载部分需要将所有数据加载到data中的内存中,这可能并不总是可行的。 - TayTay
@Tgsmith61591 正确 - 如果您正在单台计算机上读取数据,则通常无法从群集中读取所有数据。为了解决这个问题,您可能希望从文件中仅筛选/缩小/提取所需的数据,例如 data += some_filter_fx(cPickle.loads(base64.b64decode((open(part,'rb').read())))) - mgoldwasser

1
问题在于格式不是pickle文件,它是一个序列化的对象的SequenceFile。序列文件可以在Hadoop和Spark环境中打开,但不适用于Python,并使用基于JVM的序列化来序列化,在这种情况下是一个字符串列表。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接