读取分布式Tab分隔符CSV文件

3

受到这个问题的启发,我编写了一些代码来存储一个RDD(它是从Parquet文件中读取的),其模式为(photo_id,data),以成对的方式用制表符分隔,并且仅作为细节进行Base 64编码,如下所示:

def do_pipeline(itr):
   ...
   item_id = x.photo_id

def toTabCSVLine(data):
  return '\t'.join(str(d) for d in data)

serialize_vec_b64pkl = lambda x: (x[0], base64.b64encode(cPickle.dumps(x[1])))

def format(data):
    return toTabCSVLine(serialize_vec_b64pkl(data))

dataset = sqlContext.read.parquet('mydir')
lines = dataset.map(format)
lines.saveAsTextFile('outdir')

所以,现在我们感兴趣的重点是:如何读取该数据集并打印出其反序列化数据?我使用的是Python 2.6.6。
我的尝试在这里,为了验证一切是否可行,我编写了以下代码:
deserialize_vec_b64pkl = lambda x: (x[0], cPickle.loads(base64.b64decode(x[1])))

base64_dataset = sc.textFile('outdir')
collected_base64_dataset = base64_dataset.collect()
print(deserialize_vec_b64pkl(collected_base64_dataset[0].split('\t')))

这里调用了collect()方法,虽然在测试时可以使用,但在实际场景中可能会遇到困难...


编辑:

当我尝试使用zero323的建议时:

foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()

我遇到了这个错误,归结起来就是:

PythonRDD[2] at RDD at PythonRDD.scala:43
16/08/04 18:32:30 WARN TaskSetManager: Lost task 4.0 in stage 0.0 (TID 4, gsta31695.tan.ygrid.yahoo.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/grid/0/tmp/yarn-local/usercache/gsamaras/appcache/application_1470212406507_56888/container_e04_1470212406507_56888_01_000009/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/08/04 18:32:30 ERROR TaskSetManager: Task 12 in stage 0.0 failed 4 times; aborting job
16/08/04 18:32:31 WARN TaskSetManager: Lost task 14.3 in stage 0.0 (TID 38, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 13.3 in stage 0.0 (TID 39, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
16/08/04 18:32:31 WARN TaskSetManager: Lost task 16.3 in stage 0.0 (TID 42, gsta31695.tan.ygrid.yahoo.com): TaskKilled (killed intentionally)
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/homes/gsamaras/code/read_and_print.py in <module>()
     17     print(base64_dataset.map(str.split).map(deserialize_vec_b64pkl))
     18 
---> 19     foo = (base64_dataset.map(str.split).map(deserialize_vec_b64pkl)).collect()
     20     print(foo)

/home/gs/spark/current/python/lib/pyspark.zip/pyspark/rdd.py in collect(self)
    769         """
    770         with SCCallSiteSync(self.context) as css:
--> 771             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    772         return list(_load_from_socket(port, self._jrdd_deserializer))
    773 

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/home/gs/spark/current/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.

1
为什么不使用base64_dataset.map(str.split).map(deserialize_vec_b64pkl)呢? - zero323
@zero323我不知道我们可以使用str.split,我还是新手,请多包涵,我相信只要有人解释,我就能理解。那么你所提出的应该会有一个RDD结果。为了确保一切正常运行,我该如何查看第一个元素?我尝试过按照你说的去collect(),但错误发生了(Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.)。如果我能理解结果RDD的数据布局,或许有助于解决问题。 - gsamaras
@zero323 我正在使用Python 2,这应该足够了,如果需要的话,我可以转到Python 3! - gsamaras
1
2.x 也应该可以工作。我发布了一个带有 [mcve] 的答案。希望能对您有所帮助。 - zero323
1个回答

2

让我们尝试一个简单的例子。为了方便起见,我将使用方便的toolz库,但这并不是必须的。

import sys
import base64

if sys.version_info < (3, ):
    import cPickle as pickle
else:
    import pickle


from toolz.functoolz import compose

rdd = sc.parallelize([(1, {"foo": "bar"}), (2, {"bar": "foo"})])

现在,你的代码并不是完全可移植的。在Python 2中,base64.b64encode返回str,而在Python 3中返回bytes。我们来举个例子:

  • Python 2

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## str
    
  • Python 3

    type(base64.b64encode(pickle.dumps({"foo": "bar"})))
    ## bytes
    
所以让我们在流程中添加解码:
# Equivalent to 
# def pickle_and_b64(x):
#     return base64.b64encode(pickle.dumps(x)).decode("ascii")

pickle_and_b64 = compose(
    lambda x: x.decode("ascii"),
    base64.b64encode,
    pickle.dumps
)

请注意,这并不假设数据的特定形状。因此,我们将使用mapValues仅序列化键:
serialized = rdd.mapValues(pickle_and_b64)
serialized.first()
## 1, u'KGRwMApTJ2ZvbycKcDEKUydiYXInCnAyCnMu')

现在我们可以按照简单的格式进行跟进并保存:
from tempfile import mkdtemp
import os

outdir = os.path.join(mkdtemp(), "foo")

serialized.map(lambda x: "{0}\t{1}".format(*x)).saveAsTextFile(outdir)

为了读取文件,我们需要反向操作:
# Equivalent to
# def  b64_and_unpickle(x):
#     return pickle.loads(base64.b64decode(x))

b64_and_unpickle = compose(
    pickle.loads,
    base64.b64decode
)

decoded = (sc.textFile(outdir)
    .map(lambda x: x.split("\t"))  # In Python 3 we could simply use str.split
    .mapValues(b64_and_unpickle))

decoded.first()
## (u'1', {'foo': 'bar'})

另外,如果你使用的是 Python 2.x:a) str.split 可能无法正常工作,请改用完整函数;b) 在测试时,提供错误消息时 pickle 稍微冗长一些。 - zero323
2.6?!!这个我已经好久没见过了:) 我甚至没有可以用来测试的版本。更不用说Spark在最新版本中取消了对2.6的支持,而且分支已经达到了生命周期的末期,这已经是几年前的事情了。关于Toolz - 没有特别的原因,只是为了方便。我很宠坏了,发现嵌套函数调用很烦人。我添加了完整的功能函数。 - zero323
1
哦,我应该写一个函数,我太傻了,抱歉!现在一切都好了,我会调试我的代码,谢谢! - gsamaras

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接