使用elasticsearch-spark connector从spark中读取ES:所有字段都返回

4

我在spark-shell中使用elasticsearch-spark连接器进行了一些实验。调用Spark:

] $SPARK_HOME/bin/spark-shell --master local[2] --jars ~/spark/jars/elasticsearch-spark-20_2.11-5.1.2.jar

在Scala Shell中:

scala> import org.elasticsearch.spark._
scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery")

它运行良好,结果包含了我查询中指定的好记录。唯一的问题是,即使在查询中只指定了字段的子集,我仍然会得到所有字段。例如:

myquery = """{"query":..., "fields":["a","b"], "size":10}"""

返回所有字段,而不仅仅是a和b(顺便说一下,我注意到size参数也没有被考虑在内:结果包含超过10条记录)。也许重要的是要添加字段是嵌套的,a和b实际上是doc.a和doc.b。

这是连接器中的错误还是我的语法有误?


你正在使用哪个版本的Spark和ES连接器? - eliasah
Spark 2.0.1和elasticsearch-spark-20_2.11-5.1.2 - Patrick
好的,请给我一分钟写答案。 - eliasah
3个回答

4

Spark Elasticsearch连接器使用fields,因此您无法应用投影。

如果您希望对映射进行细粒度控制,应该使用DataFrame,它基本上是RDD加上模式。

pushdown谓词也应启用以将Spark SQL转换为Elasticsearch查询DSL。

现在是一个半完整的示例:

myQuery = """{"query":..., """
val df = spark.read.format("org.elasticsearch.spark.sql")
                     .option("query", myQuery)
                     .option("pushdown", "true")
                     .load("myindex/mytype")
                     .limit(10) // instead of size
                     .select("a","b") // instead of fields

1
我删掉了read后面的括号,现在它完美地工作了。再次感谢@eliasah!这确实有所帮助! - Patrick
关于这个问题,最后一个问题:如何使用这个语法指定节点?我已经在这里阅读了文档(https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext),但是我没有找到答案。 - Patrick
你看错了文档,应该参考https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html。 - eliasah
这是es.nodes,节点地址应该包含端口,如果没有单独指定的话。 - eliasah
你知道我怎么从ES中提取_id字段吗?如果我尝试将其指定为我的select子句中的一个字段,它将无法识别。 - Patrick
显示剩余2条评论

2

那么尝试调用以下代码:

scala> val es_rdd = sc.esRDD("myindex/mytype",query="myquery", Map[String, String] ("es.read.field.include"->"a,b"))

0

你想限制从elasticsearch _search HTTP API返回的字段吗?(我猜是为了提高下载速度。)

首先,使用HTTP代理查看elastic4hadoop插件正在执行的操作(我在MacOS上使用Apache Zeppelin和Charles代理)。这将帮助您了解推送下来的工作原理。

有几种解决方案可以实现此目的:

1. 数据框架和推送下来

您指定字段,插件将“转发”到ES(这里是_source参数):

POST ../events/_search?search_type=scan&scroll=5m&size=50&_source=client&preference=_shards%3A3%3B_local

(-) 对于嵌套字段不完全有效。

(+) 简单,直接,易于阅读

2. RDD和查询字段

使用JavaEsSpark.esRDD,您可以在JSON查询中指定字段,就像您所做的那样。这仅适用于RDD(对于DataFrame,字段未发送)。

(-) 没有数据框架 -> 没有Spark的方式 (+) 更灵活,更可控

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接