手动创建一个Pyspark数据框架

47
我尝试手动创建一个给定数据的Pyspark dataframe:
row_in = [(1566429545575348), (40.353977), (-111.701859)]
rdd = sc.parallelize(row_in)
schema = StructType(
    [
        StructField("time_epocs", DecimalType(), True),
        StructField("lat", DecimalType(), True),
        StructField("long", DecimalType(), True),
    ]
)
df_in_test = spark.createDataFrame(rdd, schema)

当我尝试展示数据框时,会出现错误,所以我不确定该如何操作。

然而,Spark文档对我来说有点复杂,当我尝试按照那些说明操作时,出现了类似的错误。

有人知道该怎么做吗?


如果 row_in=[(1566429545575348, 40.353977,-111.701859)],则您的代码应该可以工作。 - pault
即使使用 row_in=[(1566429545575348, 40.353977,-111.701859)],这仍然无法正常工作。 - Josh
1
真正的问题在于(1)是一个整数,而不是元组。当你只有一个元素时,需要添加逗号来创建元组(1,) - Steven
6个回答

116

简单的数据框创建:

df = spark.createDataFrame(
    [
        (1, "foo"),  # create your data here, be consistent in the types.
        (2, "bar"),
    ],
    ["id", "label"]  # add your column names here
)

df.printSchema()
root
 |-- id: long (nullable = true)
 |-- label: string (nullable = true)

df.show()
+---+-----+                                                                     
| id|label|
+---+-----+
|  1|  foo|
|  2|  bar|
+---+-----+

根据官方文档
  • 当模式是列名列表时,每个列的类型将从数据中推断出来。(如上面的示例 ↑)
  • 当模式是pyspark.sql.types.DataType或数据类型字符串时,它必须与实际数据匹配。(如下面的示例 ↓)
# Example with a datatype string
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],  
    "id int, label string",  # add column names and types here
)

# Example with pyspark.sql.types
from pyspark.sql import types as T
df = spark.createDataFrame(
    [
        (1, "foo"),  # Add your data here
        (2, "bar"),
    ],
    T.StructType(  # Define the whole schema within a StructType
        [
            T.StructField("id", T.IntegerType(), True),
            T.StructField("label", T.StringType(), True),
        ]
    ),
)


df.printSchema()
root
 |-- id: integer (nullable = true)  # type is forced to Int
 |-- label: string (nullable = true)

此外,您可以从Pandas dataframe创建您的dataframe,schema将从Pandas dataframe的类型中推断出来:
import pandas as pd
import numpy as np


pdf = pd.DataFrame(
    {
        "col1": [np.random.randint(10) for x in range(10)],
        "col2": [np.random.randint(100) for x in range(10)],
    }
)


df = spark.createDataFrame(pdf)

df.show()
+----+----+
|col1|col2|
+----+----+
|   6|   4|
|   1|  39|
|   7|   4|
|   7|  95|
|   6|   3|
|   7|  28|
|   2|  26|
|   0|   4|
|   4|  32|
+----+----+

7

这个答案演示了如何使用createDataFramecreate_dftoDF创建一个PySpark DataFrame。

df = spark.createDataFrame([("joe", 34), ("luisa", 22)], ["first_name", "age"])

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+

您也可以传递 RDD 和模式给 createDataFrame 以更精确地构造 DataFrame:

from pyspark.sql import Row
from pyspark.sql.types import *

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])

schema = schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), False)])

df = spark.createDataFrame(rdd, schema)

df.show()

+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+

create_df 是我在 Quinn 项目中开发的函数,它结合了简洁和完整描述的优点:

from pyspark.sql.types import *
from quinn.extensions import *

df = spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    [("name", StringType(), True), ("blah", StringType(), True)]
)

df.show()

+----+----+
|name|blah|
+----+----+
|jose|   a|
|  li|   b|
| sam|   c|
+----+----+

toDF 方法与其他方法相比没有任何优势:

from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])
df = rdd.toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+

7
承接@Steven的回答,进一步阐述/补充:
field = [
    StructField("MULTIPLIER", FloatType(), True),
    StructField("DESCRIPTION", StringType(), True),
]
schema = StructType(field)
multiplier_df = sqlContext.createDataFrame(sc.emptyRDD(), schema)

创建一个空数据框。

现在我们可以简单地向其中添加一行:

l = [(2.3, "this is a sample description")]
rdd = sc.parallelize(l)
multiplier_df_temp = spark.createDataFrame(rdd, schema)
multiplier_df = wtp_multiplier_df.union(wtp_multiplier_df_temp)

那个未闭合的括号是语法的一部分吗? - Luis Bosquez
为什么需要将 multiplier_df_temp 与空数据框连接?你已经使用正确的模式创建了该行。union 是无用的。 - Steven
3
这种方法应该避免,因为它过于复杂而且没有必要。 - Powers

3

格式化文本

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [
        (1, "foo"),
        (2, "bar"),
    ],
    StructType(
        [
            StructField("id", IntegerType(), False),
            StructField("txt", StringType(), False),
        ]
    ),
)
print(df.dtypes)
df.show()

1
这是我所看到的唯一解决方案,它展示了如何创建spark变量,而其他所有解决方案都假定你已经拥有它。感谢您的感谢! - Rob

1
扩展 @Steven 的答案:
data = [(i, 'foo') for i in range(1000)] # random data

columns = ['id', 'txt']    # add your columns label here

df = spark.createDataFrame(data, columns)

注意:当 schema 是列名列表时,每个列的类型将从数据中推断出来。
如果您想具体定义模式,请按如下操作:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([StructField("id", IntegerType(), True), StructField("txt", StringType(), True)])
df1 = spark.createDataFrame(data, schema)

输出:

>>> df1
DataFrame[id: int, txt: string]
>>> df
DataFrame[id: bigint, txt: string]

0

对于初学者,一个完整的从文件导入数据的示例:

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    ShortType,
    StringType,
    StructType,
    StructField,
    TimestampType,
)

import os

here = os.path.abspath(os.path.dirname(__file__))


spark = SparkSession.builder.getOrCreate()
schema = StructType(
    [
        StructField("id", ShortType(), nullable=False),
        StructField("string", StringType(), nullable=False),
        StructField("datetime", TimestampType(), nullable=False),
    ]
)

# read file or construct rows manually
df = spark.read.csv(os.path.join(here, "data.csv"), schema=schema, header=True)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接