在Spark中将多个小文件合并成几个较大的文件

8

我通过Spark使用Hive。我的Spark代码中有一个插入分区表的查询。输入的数据量为200多GB。当Spark写入分区表时,会生成非常小的文件(文件大小仅为kb级别)。因此,现在输出的分区表文件夹中有5000多个小型kb文件。我想将它们合并成几个大的MB文件,可能是几个200MB的文件。我尝试使用Hive合并设置,但它们似乎不起作用。

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")

 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")

val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")

val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")

val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")

val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")

val result7G = hiveContext.sql("set hive.aux.jars.path=c:\\Applications\\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")

val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'

以上的Hive设置适用于MapReduce Hive执行,并生成指定大小的文件。在Spark或Scala中是否有这样的选项?


你现在每天都会得到很多文件,但你只想要一些吗?或者你想将多天的文件合并成一个文件吗? - maxymoo
@maxymoo,我每天会得到许多小文件(以KB为单位的文件),但我只想要几个大文件(以MB为单位的文件)。 - dheee
嗨@user3267086,你能解决这个小文件的问题吗?我尝试使用hc.sql("bla bla").coalesce(10)方法,但它似乎没有作用,我仍然看到大约20 MB大小的200个小文件。 - Umesh K
3个回答

9

我曾经遇到过同样的问题。解决办法是使用 DISTRIBUTE BY 子句并指定分区列。这可以确保每个分区的数据都被发送到单个reducer上。在你的情况下,以下是一个示例:

INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table DISTRIBUTE BY date

嘿 @Jussi Kujala,非常感谢,这似乎对我有用。但是我有一个问题,如果我的表按多个列进行分区...在那种情况下这个方法是否可行呢? - Hemakshi Sachdev
“DISTRIBUTE BY” 语法是从哪里来的?我在 Spark 的文档中找不到它。 - botchniaque

1
你可以尝试使用 DataFrame.coalesce 方法;该方法返回指定数量分区的 DataFrame(每个分区作为插入时的一个文件)。因此,根据要插入的记录数和每条记录的典型大小,您可以估计需要将分区合并到多少个分区中,以便得到约为 200MB 的文件。

嗨@zweiterlinde,我尝试使用hc.sql("bla bla").coalesce(10)方法,但它没有起作用,我仍然看到大约20 MB的200个小文件。 - Umesh K
1
我需要更长的代码示例才能真正发表评论,但在我的玩具实验中,调用 df.write.parquetFile(...) 会导致许多部分文件,而 df.coalesce(1).write.parquetFile(...) 则只有一个。 - zweiterlinde

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接