如何使用Pyspark和Dataframes查询Elasticsearch索引

12

Elasticsearch的文档仅涵盖将完整索引加载到Spark的过程。

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type")
df.printSchema()

如何使用pyspark执行查询以从Elasticsearch索引返回数据并将其作为DataFrame加载到Spark中?

3个回答

7
以下是我如何做的。
常规环境设置和命令:
export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6
export PYSPARK_DRIVER_PYTHON=ipython2

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar

代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName("ESTest")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "filtered": {
      "filter": {
        "exists": {
          "field": "label"
        }
      },
      "query": {
        "match_all": {}
      }
    }
  }
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "titanic/passenger",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

你还可以定义数据框架列。更多信息请参考这里
希望对你有所帮助!

这就是我现在一直在做的事情,我希望有一种直接获取筛选后的DataFrame的方法。 - George Lydakis
1
我不确定使用最新的ES-Hadoop Spark连接器API是否可以实现。 - Eyal.Dahari
1
有没有办法使用这个API将数据框写入Elasticsearch? - Elesin Olalekan Fuad
@ElesinOlalekanFuad 是的,有一种方法:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources 请注意,您必须从Scala API翻译到PySpark,但这并不难。 - rjurney

2
我曾遇到类似的问题,需要将地理过滤数据加载到PySpark DataFrame中。我使用的是elasticsearch-spark-20_2.11-5.2.2.jar,Spark版本为2.1.1,ES版本为5.2。��过在创建DataFrame时指定查询选项,我能够将数据加载到DataFrame中。
我的地理查询如下:
q ="""{
  "query": {
        "bool" : {
            "must" : {
                "match_all" : {}
            },
           "filter" : {
                "geo_distance" : {
                    "distance" : "100km",
                    "location" : {
                        "lat" : 35.825,
                        "lon" : -87.99
                    }
                }
            }
        }
    }
}"""

我用以下命令将数据加载到DataFrame中。
spark_df = spark.read.format("es").option("es.query", q).load("index_name")

这方面的API详见此处:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources

是的!请查看https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources。 - rjurney

2

我是使用pyspark在Amazon的EMR集群中运行代码。下面是我让它正常工作的步骤:

1)在集群创建时添加此引导操作(用于创建本地主机elasticsearch服务器):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb

2) 我运行这些命令来向elasticsearch数据库中填充一些数据:

 curl -XPUT "http://localhost:9200/movies/movie/1" -d' {
   "title": "The Godfather",
   "director": "Francis Ford Coppola",
   "year": 1972
  }'

如果您希望,还可以运行其他curl命令,例如:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}}

3) 我使用以下参数初始化了pyspark:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar

我之前已经下载了elasticsearch python客户端

4) 我运行以下代码:

from pyspark import SparkConf
from pyspark.sql import SQLContext

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "movies/movie",
    "es.query" : q
}

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf)

sqlContext.createDataFrame(es_rdd).collect()

最终我从命令中获得了成功的结果。


您可以直接加载DataFrames:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-data-sources - rjurney

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接