MongoSpark保存时出现重复键错误E11000。

4
我有一个Spark应用程序,将RDD数据写入MongoDB中,遇到了MongoBulkWriteException异常。之前我是使用MongoDB标准驱动的bulkWrite()方法,但现在我开始使用MongoSpark驱动的write()方法。
首先声明,我使用的是Apache Spark 1.6.0和MongoDB 3.2.11。
以下是异常跟踪信息:
com.mongodb.MongoBulkWriteException: Bulk write operation error on server
 10.1.101.146:27017. Write errors: [BulkWriteError{index=0, code=11000, 
message='E11000 duplicate key error collection: collection-test 
index: _id_ dup key: { : "636253651-2017-03-07" }', details={ }}]

生成它的代码是:
JavaRDD<Document> rddInsertRecords = rddGrouped.map(new Function<Tuple2<String, BasicRecord>, Document>() {
private static final long serialVersionUID = 1L;
    @Override
    public Document call(Tuple2<String, BasicRecord> tuple2) throws Exception {
          Document json = tuple2._2.toBSONDocument();
          return json;
      }
});
MongoSpark.save(rddInsertRecords, WriteConfig.create(sc.getConf()));

我有一个使用旧代码的备选方案,但我想使用MongoSpark编写。

我在MongoDB的JIRA中看到了这个问题(https://jira.mongodb.org/browse/SERVER-14322),但我不确定如何绕过这个问题。

更新:我忘了提到故障不是第一次发生(即mongodb上没有数据,集合为空)。它在第二次运行作业时失败。从技术上讲,驱动程序应该执行upsert,对吗?

1个回答

2
Spark Connector不知道如何upsert RDD ,其中T可以是任何类型 - 它如何获取id值?
然而,Datasets/DataFrames具有包含模式信息的架构,指示哪个字段是_id字段,并且可以自动用于upserts。这是在SPARK-66中完成的。Datasets/DataFrames的另一个好处是它们更有效率,应该为您的Spark作业提供性能提升。
如果您必须使用RDD,则可以通过MongoConnector类以编程方式访问MongoDB集合并创建upsert操作。

明白了。目前我无法将RDD切换为Dataset,所以我想我会使用MongoConnector方法。感谢您的澄清。 - cabreracanal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接