我正在构建一个数据管道,它可以从JSON格式的REST API中消费数据并将其推送到Spark Dataframe中。Spark版本:2.4.4。
但是遇到了错误:
但是遇到了错误:
df = SQLContext.jsonRDD(rdd)
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'
代码:
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()
def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string
def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = spark.sparkContext.parallelize(mylist)
df = SQLContext.jsonRDD(rdd)
return df
url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)
是否有更好的方式使用Pyspark查询RestApi并将数据带入Spark Dataframe,我不确定自己是否遗漏了什么。