Spark/scala中的SQL查询大小超过了Integer.MAX_VALUE

25

我正在尝试使用Spark在S3事件上创建一个简单的SQL查询。我按照以下方式加载了约30GB的JSON文件:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

然后我尝试将查询结果写入文件:

val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

但是Spark抛出了以下异常:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

请注意,相同的查询在处理少量数据时是没有问题的。这里的问题是什么?


最可能的问题是分区大小超过了限制,请尝试使用.repartition(100)等方法,这应该可以解决它。 - elcomendante
读取数据后,尝试重新分区 val d2 = spark.read.json("s3n://myData/2017/02/01/1234").repartition(1000)。参考 https://issues.apache.org/jira/browse/SPARK-1476 - undefined_variable
顺便提一下,您可能想考虑使用更新的s3a而不是s3n; 参见例如https://dev59.com/Q1wX5IYBdhLWcg3w0SNf - sgvd
感谢您的回答。查询在30GB的数据上运行成功。现在我正在尝试对大约200GB的数据运行查询,但是出现了以下错误: “无法将RPC 6395111411946395180发送到/x.x.x.x:yyyy:java.nio.channels.ClosedChannelException”还有:“尝试获取执行程序ID为165的执行程序丢失原因,RPC地址为x.x.x.x:yyyyyy,但未收到响应。标记为从节点丢失。”有什么想法吗?我正在使用100个重新分区加载数据。 - eexxoo
2个回答

61
没有Spark的shuffle块可以大于2GB(Integer.MAX_VALUE字节),因此您需要更多/更小的分区。
您应该调整spark.default.parallelism和spark.sql.shuffle.partitions(默认为200),以便分区的数量可以容纳您的数据,而不会达到2GB限制(您可以尝试针对每个分区256MB,因此对于200GB,您将获得800个分区)。成千上万的分区非常常见,因此不要害怕重新分区到建议的1000个分区。
FYI,您可以使用类似rdd.getNumPartitions(即d2.rdd.getNumPartitions)的内容检查RDD的分区数。
有一个跟踪处理各种2GB限制的努力的故事(已经开放了一段时间):https://issues.apache.org/jira/browse/SPARK-6235 有关此错误的更多信息,请参见http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25

3
谢谢您的解释!还请查看https://dev59.com/f1cO5IYBdhLWcg3wgRwG以编辑默认分区数量。 - Raphvanns

0

当我使用Spark核心处理200G的数据时,设置--conf spark.default.parallelism = 2000.repartition(100),但是会出现错误,最终我使用以下设置解决:

val conf = new SparkConf()
         .setAppName(appName)
         .set("spark.rdd.compress", "true")

spark.rdd.compress的描述

希望能对你有所帮助


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接