我是使用pyspark在Amazon的EMR集群中运行代码。下面是我让它正常工作的步骤:
1)在集群创建时添加此引导操作(用于创建本地主机elasticsearch服务器):
s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb
2) 我运行这些命令来向elasticsearch数据库中填充一些数据:
curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
"title": "The Godfather",
"director": "Francis Ford Coppola",
"year": 1972
}'
如果您希望,还可以运行其他curl命令,例如:
curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}
3) 我使用以下参数初始化了pyspark:
pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar
我之前已经下载了elasticsearch python客户端
4) 我运行以下代码:
from pyspark import SparkConf
from pyspark.sql import SQLContext
q ="""{
"query": {
"match_all": {}
}
}"""
es_read_conf = {
"es.nodes" : "localhost",
"es.port" : "9200",
"es.resource" : "movies/movie",
"es.query" : q
}
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
sqlContext.createDataFrame(es_rdd).collect()
最终我从命令中获得了成功的结果。