PySpark 2.4.0,使用读取流从 Kafka 读取 Avro - Python

8

我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。

spark-avro外部模块可以提供读取avro文件的解决方案:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

然而,我需要读取流式的avro消息。该库文档建议使用from_avro()函数,但这仅适用于Scala和Java。

是否有其他模块支持从Kafka读取流式的avro消息?

1个回答

15

您可以包含spark-avro软件包,例如使用--packages(调整版本以匹配spark安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供自己的包装器:

from pyspark.sql.column import Column, _to_java_column 

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

示例用法(源自官方测试套件):

from pyspark.sql.functions import col, struct


avro_type_struct = """
{
  "type": "record",
  "name": "struct",
  "fields": [
    {"name": "col1", "type": "long"},
    {"name": "col2", "type": "string"}
  ]
}"""


df = spark.range(10).select(struct(
    col("id"),
    col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)

+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows

avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)

+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
|                                          [0, 0]|
|                                          [1, 1]|
|                                          [2, 2]|
+------------------------------------------------+
only showing top 3 rows

这里需要注意的是,我在使用spark-submit导入包时遇到了一个问题,即$spark-submit job.py --packages org.apache.spark:spark-avro_2.11:2.4.0无法正常工作。相反,应该像这样编写$spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 job.py。 - Panagiotis Fytas
这在使用Confluent Schema Registry的Avro时无法工作。对此,这个答案似乎更好 https://dev59.com/elUM5IYBdhLWcg3wa_kF#55786881 - OneCricketeer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接