Pyspark解析固定宽度的文本文件。

20
尝试解析一个定宽文本文件。 我的文本文件看起来像下面这样,我需要一行id、日期、字符串和整数:
00101292017you1234
00201302017 me5678

我可以使用sc.textFile(路径)将文本文件读取为RDD。 我可以使用解析后的RDD和模式创建DataFrame。 这两个步骤之间是解析。

4个回答

23

Spark的substr函数可以处理固定宽度的列,例如:

df = spark.read.text("/tmp/sample.txt")
df.select(
    df.value.substr(1,3).alias('id'),
    df.value.substr(4,8).alias('date'),
    df.value.substr(12,3).alias('string'),
    df.value.substr(15,4).cast('integer').alias('integer')
).show()

将导致:

+---+--------+------+-------+
| id|    date|string|integer|
+---+--------+------+-------+
|001|01292017|   you|   1234|
|002|01302017|    me|   5678|
+---+--------+------+-------+

将列拆分后,您可以重新格式化并像普通的Spark DataFrame一样使用它们。


df = sqlContext.read.text("blah.txt") 我必须使用sqlContext。然后它就可以工作了。我想我需要学习一些关于上下文的知识。但除此之外,你已经回答了我的问题。

- Chris Hamson
1
spark代表版本>=2.0的Spark会话。如果您使用的是Spark 1.6或更低版本,则需要使用sqlContext,但在访问数据方面它的行为基本相同。 - Mariusz
有没有一种方法可以从字典中获取列名和宽度的值? 我想使用一个字典来编程映射我的列。 - Sorin D.

4

有人询问如何基于一个模式来实现。根据以上回复,这里提供一个简单的示例:

x= '''    1 123121234 joe
    2 234234234jill
    3 345345345jane
    4abcde12345jack'''

schema = [
          ("id",1,5),
          ("ssn",6,10),
          ("name",16,4)
]
          
with open("personfixed.csv", "w") as f:
  f.write(x)

df = spark.read.text("personfixed.csv")
df.show()

df2 = df
for colinfo in schema:
  df2 = df2.withColumn(colinfo[0], df2.value.substr(colinfo[1],colinfo[2]))

df2.show()

这是输出结果:
+-------------------+
|              value|
+-------------------+
|    1 123121234 joe|
|    2 234234234jill|
|    3 345345345jane|
|    4abcde12345jack|
+-------------------+

+-------------------+-----+----------+----+
|              value|   id|       ssn|name|
+-------------------+-----+----------+----+
|    1 123121234 joe|    1| 123121234| joe|
|    2 234234234jill|    2| 234234234|jill|
|    3 345345345jane|    3| 345345345|jane|
|    4abcde12345jack|    4|abcde12345|jack|
+-------------------+-----+----------+----+

3

这里是一个单行代码:

df = spark.read.text("/folder/file.txt")

df.select(*map(lambda x: trim(df.value.substr(col_idx[x]['idx'], col_idx[x]['len'])).alias(x), col_idx))

其中col_idx类似于以下内容:

col_idx = {col1: {'idx': 1, 'len': 2}, col2: {'idx': 3, 'len': 1}}

当你有很多列需要处理时,这样会更加实用,并且使用select比使用多个withcolumn更加高效(参见The hidden cost of Spark withColumn)。


0
df = spark.read.text("fixedwidth")

df.withColumn("id",df.value.substr(1,5)).withColumn("name",df.value.substr(6,11)).drop('value').show()

结果是

+-----+------+
|   id|  name|
+-----+------+
|23465|ramasg|
|54334|hjsgfd|
|87687|dgftre|
|45365|ghfduh|
+-----+------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接