使用Pyspark从REST API获取数据并生成Spark Dataframe

6
我正在构建一个数据管道,它可以从JSON格式的REST API中消费数据并将其推送到Spark Dataframe中。Spark版本:2.4.4。
但是遇到了错误:
df = SQLContext.jsonRDD(rdd) 
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'

代码:

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()


def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string

def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = spark.sparkContext.parallelize(mylist)
df = SQLContext.jsonRDD(rdd)
return df

url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)

 

是否有更好的方式使用Pyspark查询RestApi并将数据带入Spark Dataframe,我不确定自己是否遗漏了什么。


有一个适用于Spark的REST API数据源,请查看此链接 - https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest - Srinivas
你使用了“REST API数据源”来实现并行请求吗? - FelipePerezR
更新的导入: 从 urllib.request 导入 urlopen - McKenzie
1个回答

6

查看Spark Rest API 数据源。这个库的一个优点是它将使用多个执行器来获取数据rest api并为您创建数据框。

在您的代码中,您正在将所有数据提取到驱动程序中并创建DataFrame,如果您有非常大的数据,则可能会失败并出现堆空间不足的情况。

url = "https://mylink"
options = { 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}

# Now we create the Dataframe which contains the result from the call to the API
df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()


评论不适合进行长时间的讨论;此对话已被移至聊天室 - Samuel Liew
5
太好了。现在谈话在哪里? - ss301
1
请注意,RestDataSource库依赖于scalaj_http包,该包已被弃用,详细信息请参见https://github.com/scalaj/scalaj-http。 - undefined
是的,我明白了。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接