我有以下的代码,其中大部分时间都会触发 hiveContext.sql()
。我的任务是在处理完所有hive表分区后,创建几个表并向其中插入值。
因此,我首先执行 show partitions
并使用其输出结果在for循环中调用一些方法,这些方法会创建表(如果不存在)并使用 hiveContext.sql
向其中插入数据。
现在,我们无法在executor中执行 hiveContext
,因此我必须在 driver 程序的 for 循环中执行它,并按顺序一个接一个地运行。当我在 YARN 集群中提交此 Spark 作业时,几乎每次都会因为找不到 shuffle 而导致我的 executor 丢失。
现在这种情况是由于 YARN 在记忆超载时杀死了我的 executor。我不明白为什么,因为每个 hive 分区的数据集非常小,但仍然会导致 YARN 杀死我的 executor。
以下代码是否会并行执行所有操作,并同时尝试容纳所有 hive 分区的数据集到内存中?
public static void main(String[] args) throws IOException {
SparkConf conf = new SparkConf();
SparkContext sc = new SparkContext(conf);
HiveContext hc = new HiveContext(sc);
DataFrame partitionFrame = hiveContext.sql(" show partitions dbdata partition(date="2015-08-05")");
Row[] rowArr = partitionFrame.collect();
for(Row row : rowArr) {
String[] splitArr = row.getString(0).split("/");
String server = splitArr[0].split("=")[1];
String date = splitArr[1].split("=")[1];
String csvPath = "hdfs:///user/db/ext/"+server+".csv";
if(fs.exists(new Path(csvPath))) {
hiveContext.sql("ADD FILE " + csvPath);
}
createInsertIntoTableABC(hc,entity, date);
createInsertIntoTableDEF(hc,entity, date);
createInsertIntoTableGHI(hc,entity,date);
createInsertIntoTableJKL(hc,entity, date);
createInsertIntoTableMNO(hc,entity,date);
}
}