如何使用pyspark从Spark中获取批量行数据

8

我有一个由超过60亿行数据组成的Spark RDD,我想用它来训练深度学习模型,使用train_on_batch。我无法将所有行都放入内存中,因此我想每次获取大约10K行数据,然后批处理成64或128个数据块(取决于模型大小)。我目前正在使用rdd.sample(),但我认为这并不能保证我能够获取到所有行。是否有更好的方法来分区数据,以使其更易于管理,从而编写生成器函数以获取批处理数据?我的代码如下:

data_df = spark.read.parquet(PARQUET_FILE)
print(f'RDD Count: {data_df.count()}') # 6B+
data_sample = data_df.sample(True, 0.0000015).take(6400) 
sample_df = data_sample.toPandas()

def get_batch():
  for row in sample_df.itertuples():
    # TODO: put together a batch size of BATCH_SIZE
    yield row

for i in range(10):
    print(next(get_batch()))

1
提醒一下,你的命名有误,那不是RDD,而是数据框DataFrame。 - Luiz Fernando Lobo
我不相信你在使用pandas迭代spark时没有任何收获,最好还是用Python分块读取。 - Luiz Fernando Lobo
我更改了命名以反映它是一个数据框。我需要将数据作为 Pandas 数据框提供给现有模型。 - csteel
我想友好地询问一下为什么这个问题被投票负分。我花了很多时间在谷歌上搜索如何做到这一点。如果答案很明显或者有一个已记录/已理解的机制,我会感激提供一个链接,并为浪费时间道歉。 - csteel
2个回答

6

试试这个:

 from pyspark.sql import functions as F
 sample_dict = {}

 # Read the parquet file
 df = spark.read.parquet("parquet file")

 # add the partition_number as a column
 df = df.withColumn('partition_num', F.spark_partition_id())
 df.persist()

 total_partition = [int(row.partition_num) for row in 
 df.select('partition_num').distinct().collect()]

 for each_df in total_partition:
     sample_dict[each_df] = df.where(df.partition_num == each_df) 

1
这对我很有效,特别是因为我有一个大的parquet文件。 - Abhishek Kumar

3

我不相信 Spark 可以偏移或分页你的数据。

但是你可以添加一个索引,然后在其上进行分页,首先:

    from pyspark.sql.functions import lit
    data_df = spark.read.parquet(PARQUET_FILE)
    count = data_df.count()
    chunk_size = 10000

    # Just adding a column for the ids
    df_new_schema = data_df.withColumn('pres_id', lit(1))
    
    # Adding the ids to the rdd 
    rdd_with_index = data_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row) + [rowId+1]))
    
    # Creating a dataframe with index
    df_with_index = spark.createDataFrame(rdd_with_index,schema=df_new_schema.schema)
    
    # Iterating into the chunks
    for page_num in range(0, count+1, chunk_size):
        initial_page = page_num*chunk_size
        final_page = initial_page + chunk_size 
        where_query = ('pres_id > {0} and pres_id <= {1}').format(initial_page,final_page)
        chunk_df = df_with_index.where(where_query).toPandas()
        train_on_batch(chunk_df) # <== Your function here        

这不是最佳选择,因为使用pandas dataframe会严重影响Spark的性能,但可以解决你的问题。
如果这会影响到你的函数,请不要忘记删除id

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接