将Pandas DataFrame转换为Spark DataFrame

8
我之前提过一个问题,关于如何将scipy稀疏矩阵转换为pyspark.sql.dataframe.DataFrame,在阅读了给出的答案和这篇文章后有所进展。最终我写出了以下代码,可以将 scipy.sparse.csc_matrix 转换为 pandas dataframe:
df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0)
df.columns = header

然后我尝试使用建议的语法将pandas dataframe 转换为spark dataframe:

spark_df = sqlContext.createDataFrame(df)

然而,我收到了以下错误:
ValueError: cannot create an RDD from type: <type 'list'>

我认为这与sqlContext无关,因为我能够将另一个大小大致相同的pandas数据框转换为spark数据框,没有任何问题。有什么想法吗?

你正在运行哪个版本?在我看来没问题。 - maxymoo
2
在将其转换为Spark DF之前,请尝试使用“print df”命令。您可能会对“list”类型有一些线索。 - mrsrinivas
打印数据框的一部分后(100K行,5300列),我注意到的唯一特征是每个列的数据类型都是“float64”,因此每个数字都表示为带有多个尾随零的小数。然而,只有前10列需要是浮点数。但我不确定这是否是导致错误的原因。 - Dirigo
如果您能在此处打印Pandas数据框的示例输出,则将有助于我们解决问题。 - Ankit Kumar Namdeo
2个回答

2

我不确定这个问题是否仍然与当前版本的pySpark相关,但是这是我在发布这个问题几周后想出的解决方案。代码相当丑陋,可能效率也不高,但由于这个问题一直受到关注,我在这里发布它。

原始答案翻译成中文为"最初的回答"。

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark import SparkConf
from py4j.protocol import Py4JJavaError

myConf = SparkConf(loadDefaults=True)
sc = SparkContext(conf=myConf)
hc = HiveContext(sc)


def chunks(lst, k):
    """Yield k chunks of close to equal size"""
    n = len(lst) / k
    for i in range(0, len(lst), n):
        yield lst[i: i + n]


def reconstruct_rdd(lst, num_parts):
    partitions = chunks(lst, num_parts)
    for part in range(0, num_parts - 1):
        print "Partition ", part, " started..."
        partition = next(partitions)    # partition is a list of lists
        if part == 0:
            prime_rdd = sc.parallelize(partition)
        else:
            second_rdd = sc.parallelize(partition)
            prime_rdd = prime_rdd.union(second_rdd)
        print "Partition ", part, " complete!"
    return prime_rdd


def build_col_name_list(len_cols):
    name_lst = []
    for i in range(1, len_cols):
        idx = "_" + str(i)
        name_lst.append(idx)
    return name_lst


def set_spark_df_header(header, sdf):
    oldColumns = build_col_name_lst(len(sdf.columns))
    newColumns = header
    sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf)
    return sdf


def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts):
    try:
        sdf = hc.createDataFrame(pdf)
    except ValueError:
        lst = pdf.values.tolist()   #Need to convert to list of list to parallelize
        try:
            rdd = sc.parallelize(lst)
        except Py4JJavaError:
            rdd = reconstruct_rdd(lst, num_of_parts)
            sdf = hc.createDataFrame(rdd)
            sdf = set_spark_df_header(sdf_header, sdf)
    return sdf

0

to_sparse(fill_value=0) 基本上已经过时了。只需使用标准变量

sqlContext.createDataFrame(pd.DataFrame(csc_mat.todense()))

只要类型兼容,你就没问题了。

1
OP 询问的是 Spark,而不是 Sparse。 - JohnE

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接