您的数据存在问题,整数0300无法按照所需格式加载,对我而言它被加载为了192,因此您需要首先将其作为字符串加载,并在加载时使用模式指定数据类型。请参考
文档。例如,对于一个.csv文件:
from pyspark.sql import DataFrameReader
from pyspark.sql.types import *
schema = StructType([StructField("Year", StringType(), True), StructField("month", StringType(), True), StructField("date", StringType(), True), StructField("hhmm", StringType(), True)])
dd = DataFrameReader.csv(path='your/data/path', schema=schema)
然后您需要修复数据格式并将其转换为时间戳:
from pyspark.sql import functions as F
dd = spark.createDataFrame([('2019','2','13','1030'),('2018','2','14','1000'),('2029','12','13','300')],["Year","month","date","hhmm"])
dd = (dd.withColumn('month', F.when(F.length(F.col('month')) == 1, F.concat(F.lit('0'), F.col('month'))).otherwise(F.col('month')))
.withColumn('date', F.when(F.length(F.col('date')) == 1, F.concat(F.lit('0'), F.col('date'))).otherwise(F.col('date')))
.withColumn('hhmm', F.when(F.length(F.col('hhmm')) == 1, F.concat(F.lit('000'), F.col('hhmm')))
.when(F.length(F.col('hhmm')) == 2, F.concat(F.lit('00'), F.col('hhmm')))
.when(F.length(F.col('hhmm')) == 3, F.concat(F.lit('0'), F.col('hhmm')))
.otherwise(F.col('hhmm')))
.withColumn('time', F.to_timestamp(F.concat(*dd.columns), format='yyyyMMddHHmm'))
)
dd.show()
+----+-----+----+----+-------------------+
|Year|month|date|hhmm| time|
+----+-----+----+----+-------------------+
|2019| 02| 13|1030|2019-02-13 10:30:00|
|2018| 02| 14|1000|2018-02-14 10:00:00|
|2029| 12| 13|0300|2029-12-13 03:00:00|
+----+-----+----+----+-------------------+
concat()
进行字符串连接,然后使用unix_timestamp()
和from_unixtime()
将其转换为timestamp
。 - samkartfrom_unixtime(unix_timestamp(col('concatedcol'), 'yyyy-MM-dd HHmm'), 'yyyy-MM-dd HH:mm:ss')
。我正在使用您上面提到的代码格式。 - samkartdt.select(to_timestamp(dt.datetime1,'yyyy-MM-dd HH:mm').alias('datetime1')).collect()```
- premon