使用Parquet文件元数据创建Hive表

7
我将一个DataFrame写入为parquet文件。我想使用parquet文件的元数据通过Hive读取该文件。
写入parquet文件的输出:
_common_metadata  part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  _SUCCESS
_metadata         part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet  part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet

Hive表

CREATE  TABLE testhive
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  '/home/gz_files/result';



FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified

我如何从parquet文件中推断元数据?

如果我打开_common_metadata,我会看到以下内容:

PAR1LHroot
%TSN%
%TS%
%Etype%
)org.apache.spark.sql.parquet.row.metadata▒{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]}

或者如何解析元数据文件?

你试过使用更新的Hive语法了吗?https://cwiki.apache.org/confluence/display/Hive/Parquet - Reactormonk
1
如果我添加列名,它就能工作。但是,Parquet在元信息中具有模式。 - WoodChopper
6个回答

12

这是我想出来的一种方法,可以获取parquet文件中的元数据,以便创建Hive表。

首先启动spark-shell(或将其全部编译成jar并使用spark-submit运行,但使用shell要容易得多)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.DataFrame


val df=sqlContext.parquetFile("/path/to/_common_metadata")

def creatingTableDDL(tableName:String, df:DataFrame): String={
  val cols = df.dtypes
  var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " ("
  //looks at the datatypes and columns names and puts them into a string
  val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ")
  ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'"
  ddl1
}

val test_tableDDL=creatingTableDDL("test_table",df,"test_db")

它将为您提供Hive将在Parquet中存储每个列时使用的数据类型。例如:CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'


1
今天测试时刚学到,如果找到的数据类型是“integer”,Spark 会将其读取为“integerType”,因此当我替换“Type”时,它变成了“integer”。Hive 不喜欢“integer”,所以你需要在 DDL 中将其更改为“int”,但这只是一个小改动,你可以自己解决 =) - James Tobin
非常好,这应该被添加到Spark SQL中! - Thomas Decaux
你确定代码可以编译吗?我的意思是,creatingTableDDL("test_table",df,"test_db") 有三个参数,但方法定义只有两个。参数 "test_db" 的目的是什么? - UninformedUser
这是从本地副本复制的与数据库指定相关的编程内容;因此,如果您复制并运行上述示例而没有进行任何更改,它将无法正常工作,但是creatingTableDDL("test_table",df)会起作用。 - James Tobin

11

我只是想在詹姆斯·托宾的回答上进行详细阐述。有一个StructField类,它提供了Hive的数据类型,而不需要进行字符串替换。

// Tested on Spark 1.6.0.

import org.apache.spark.sql.DataFrame

def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = {
    val columns = dataFrame.schema.map { field =>
        "  " + field.name + " " + field.dataType.simpleString.toUpperCase
    }

    s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)"
}

这解决了IntegerType问题。

scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y")
dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string]

scala> print(dataFrameToDDL(dataFrame, "t"))
CREATE TABLE t (
  x INT,
  y STRING
)

这应该适用于任何DataFrame,而不仅仅是Parquet格式的DataFrame。(例如,我正在使用它与JDBC DataFrame一起使用。)

作为额外的奖励,如果您的目标数据定义语言(DDL)支持可空列,则可以通过检查StructField.nullable来扩展函数。


2
这个回答比我提供的那个更加“真实”。我自己选择采用这个回答。 - James Tobin
如果表被分区了怎么办?我有一个数据库,其中一些表被分区,而另一些则没有。 - Faisal Ahmed Siddiqui

2
我希望您能扩展詹姆斯的答案, 以下代码适用于包括ARRAY、MAP和STRUCT在内的所有数据类型。 已在SPARK 2.2中进行了测试。
val df=sqlContext.parquetFile("parquetFilePath")
val schema = df.schema
var columns = schema.fields
var ddl1 = "CREATE EXTERNAL TABLE " tableName + " ("
val cols=(for(column <- columns) yield column.name+" "+column.dataType.sql).mkString(",")
ddl1=ddl1+cols+" ) STORED AS PARQUET LOCATION '/tmp/hive_test1/'"
spark.sql(ddl1)

1

更新后(可用)链接:https://docs.cloudera.com/runtime/7.2.15/impala-sql-reference/topics/impala-create-table.html - Ryan Jendoubi

1

对Victor的小改进(在field.name上添加引号)并修改为将表绑定到本地parquet文件(在spark 1.6.1上进行了测试)。

def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = {
    val columns = dataFrame.schema.map { field =>
      "  `" + field.name + "` " + field.dataType.simpleString.toUpperCase
    }
    s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'"
  }

还要注意:

  • 需要使用HiveContext,因为SQLContext不支持创建外部表。
  • parquet文件夹的路径必须是绝对路径。

0

我有同样的问题。不过从实际角度来看,实现可能会很困难,因为Parquet支持模式演化:

http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-0-x/topics/impala_parquet.html#parquet_schema_evolution_unique_1

例如,您可以向表中添加新列,而不必触及已经存在于表中的数据。只有新的数据文件将具有新的元数据(与先前版本兼容)。
自Spark 1.5.0以来,默认情况下关闭模式合并,因为它是“相对昂贵的操作”http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging。因此,推断最近的模式可能并不像听起来那么简单。尽管可以通过解析输出等快速脏方法来实现。
$ parquet-tools schema /home/gz_files/result/000000_0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接