ElasticSearch 转换为 Spark RDD

10

我在本地机器上测试了ElasticSearch和Spark的集成,使用了一些加载到elasticsearch中的测试数据。

val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
      classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()

使用esRDD.first()可以成功运行代码并返回正确结果。

然而,调用esRDD.collect()会引发异常:

java.io.NotSerializableException: org.apache.hadoop.io.Text
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我认为这与此处提到的问题有关http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html,因此我相应地添加了这一行。

conf.set("spark.serializer", classOf[KryoSerializer].getName)

我需要做其他事情才能让它工作吗? 谢谢


更新: 序列化设置问题已经解决。通过使用

sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)

代替

conf.set("spark.serializer", classOf[KryoSerializer].getName)

现在出现了另一个数据集。 该数据集中有1000个不同的记录。

esRDD.count()

返回1000没问题,但是

esRDD.distinct().count()

返回 5!如果我打印记录

esRDD.foreach(println)

它正确地打印出了前1000条记录。但是,如果我使用collect或take

esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)

它将打印重复记录,实际上只显示了5个唯一的记录,这似乎是整个数据集的随机子集 - 不是前5条记录。 如果我保存RDD并重新读取

esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)

esRDD2表现正常。我不知道是不是有bug,或者是我对collect/take的行为理解有误。还是因为我在本地运行的原因呢? 默认情况下,Spark RDD似乎使用5个分区,就像“spark-output”文件中part-xxxx文件的数量所示。这可能是为什么esRDD.collect()和esRDD.distinct()返回了5个唯一记录,而不是其他随机数字的原因。但这仍然不对。


你能找到第二个问题的源吗?我目前遇到了类似的问题,当es.resource指向单个索引(有5个分片)时,count返回正确的值,但是当查询两个完全相同的索引(总共10个分片)时,它会返回X4倍而不是X2倍。distinct解决了这个问题并产生了正确的结果,但我不明白为什么count不能做到这一点... - Roman
2个回答

1

您应该使用以下代码进行初始化:

val sparkConf = new SparkConf().setAppName("Test").setMaster("local").set("spark.serializer", classOf[KryoSerializer].getName)
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")

谢谢,它起作用了。'conf' 指的是 SparkConf 而不是 JobConf,这就是混淆的原因。 - user3931226

0

你可以尝试

val spark = new SparkConf()
    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    .set("es.nodes",localhost)
    .set("es.port","9200")
    .appName("ES")
    .master("local[*]")


val data = spark.read
  .format("org.elasticsearch.spark.sql")
  .option("es.query", "?q=firstname:Daniel")") 
  .load("bank/account").rdd

data.first()
data.collect()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接