使用管道聚合进行Spring Data MongoDB查找

18
我该如何将以下 MongoDB 查询转换为可供我的 Java Spring 应用程序使用的查询?我找不到一种方法来使用提供的lookup方法与 pipeline一起使用。
以下是我尝试转换的查询。我还想指出,我没有使用$unwind,因为我希望deliveryZipCodeTimings保持为返回对象中的分组集合。
db.getCollection('fulfillmentChannel').aggregate([
    {
        $match: {
            "dayOfWeek": "SOME_VARIABLE_STRING_1"
        }
    },
    {
        $lookup: {
            from: "deliveryZipCodeTiming",
            let: { location_id: "$fulfillmentLocationId" },
            pipeline: [{
                $match: {
                    $expr: {
                        $and: [
                            {$eq: ["$fulfillmentLocationId", "$$location_id"]},
                            {$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
                        ]
                    }
                }
            },
            { 
                $project: { _id: 0, zipCode: 1, cutoffTime: 1 } 
            }],
            as: "deliveryZipCodeTimings"
        }
    },
    {
        $match: {
            "deliveryZipCodeTimings": {$ne: []}
        }
    }
])
5个回答

17

在 @dnickless 的信息基础上,我成功解决了这个问题。为了帮助未来可能遇到同样问题的人,我将发布完整的解决方案。

我使用的是 mongodb-driver:3.6.4

首先,我需要创建一个自定义聚合操作类,这样我就可以传入自定义的 JSON mongodb 查询以用于聚合操作。这将允许我在 $lookup 中使用 pipeline ,而此功能不受我所使用的驱动程序版本支持。

public class CustomProjectAggregationOperation implements AggregationOperation {
    private String jsonOperation;

    public CustomProjectAggregationOperation(String jsonOperation) {
        this.jsonOperation = jsonOperation;
    }

    @Override
    public Document toDocument(AggregationOperationContext aggregationOperationContext) {
        return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
    }
}

现在我们已经能够将自定义的JSON查询传递到我们的mongodb spring实现中,剩下的就是将这些值插入到TypedAggregation查询中。

public List<FulfillmentChannel> getFulfillmentChannels(
    String SOME_VARIABLE_STRING_1, 
    String SOME_VARIABLE_STRING_2) {

    AggregationOperation match = Aggregation.match(
            Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
    AggregationOperation match2 = Aggregation.match(
            Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
    String query =
            "{ $lookup: { " +
                    "from: 'deliveryZipCodeTiming'," +
                    "let: { location_id: '$fulfillmentLocationId' }," +
                    "pipeline: [{" +
                    "$match: {$expr: {$and: [" +
                    "{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
                    "{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
                    "{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
                    "as: 'deliveryZipCodeTimings'}}";

    TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
            FulfillmentChannel.class,
            match,
            new CustomProjectAggregationOperation(query),
            match2
    );

    AggregationResults<FulfillmentChannel> results = 
        mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
    return results.getMappedResults();
}

非常好的答案!这对我正在处理的查询有很大帮助。但是,当使用日期作为匹配的一部分时,我发现了一个小问题。在let或match中直接使用日期时,CustomProjectAggregationOperation的构造函数应该只接受一个文档而不是稍后需要解析的字符串。解析字符串似乎会破坏日期,导致结果不正确或根本没有结果。 - Josh Balcitis
@AlwaysLearning: 我尝试使用TypedAggregation,但是我的AggregationResult与实体不同。我收到一个PropertyReferenceException错误,说查找的ID在实体上不存在。 - Farhan
我不确定为什么没有人指向“ExposedFields”,当Spring data for mongo解析管道中的每个步骤时,它会查找前一步骤中的公开字段,在自定义聚合的情况下,它会因错误而失败java.lang.IllegalArgumentException: Invalid reference。我不得不实现FieldsExposingAggregationOperation和FieldsExposingAggregationOperation.InheritsFieldsAggregationOperation并提供getFields方法的实现。 @Override public ExposedFields getFields() { return ExposedFields.synthetic(Fields.fields(this.exposedField)); } - Mohit Sharma
关于在查找后创建的新字段:deliveryZipCodeTimings,有什么问题吗?我试图访问并对这个新字段进行排序,但是出现了无效引用的错误。有人可以帮忙吗? - Ritesh Singh

5

我希望补充我的解决方案,这在某些方面与之前发布的解决方案有所重复。

Mongo驱动程序v3.x

对于Mongo驱动程序v3.x,我采用了以下解决方案:

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    public JsonOperation(String json) {
        Object root = JSON.parse(json);

        documents = root instanceof BasicDBObject
                    ? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
                    : ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }
}

假设一些资源中提供了聚合步骤,例如aggregations.json

[
  {
    $match: {
      "userId": "..."
    }
  },
  {
    $lookup: {
      let: {
        ...
      },
      from: "another_collection",
      pipeline: [
        ...
      ],
      as: "things"
    }
  },
  {
    $sort: {
      "date": 1
    }
  }
]

可以按以下方式使用上述类:

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;

Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();

Mongo driver v4.x

在Mongo v4版本中,JSON类被移除,因此我已将该类重写如下:

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    private static final String DUMMY_KEY = "dummy";

    public JsonOperation(String json) {
        documents = parseJson(json);
    }

    static final List<Document> parseJson(String json) {
        return (json.startsWith("["))
                    ? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
                    : Collections.singletonList(Document.parse(json));
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }

    @Override
    public String getOperator() {
        return documents.iterator().next().keySet().iterator().next();
    }
}

但是由于字符串操作的缘故,实现现在有点丑陋。如果有更好的想法可以更优雅地解析对象数组,请编辑此帖或留言评论。理想情况下,Mongo核心应该有一些方法可以解析JSON对象或列表(返回BasicDBObject/BasicDBListDocument/List<Document>)。

还要注意,在toPipelineStages()方法中我省略了将Document实例转换成管道阶段的步骤,因为在我的情况下这并不是必要的:

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}


4

驱动程序几乎总是落后于MongoDB提供的当前语言功能-因此,一些最新和最伟大的功能尚未通过API完美地访问。恐怕这是其中之一,您需要使用字符串。就像这样(未经测试):

AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);

1
谢谢@dnickless,但是我没有看到任何以DBObject作为参数的Aggregation.newAggregation。你知道我该如何将DBObject传递给Aggregation.newAggregation吗? - Always Learning
@AlwaysLearning:你说得对...让我谷歌一下...没有Spring,可以像这样实现:https://stackoverflow.com/questions/36352110/new-aggregation-feature-with-mongo-3-2-driver-using-java - dnickless
我认为你可以通过实现自己的基于JSON字符串的AggregationOperation类来完成它:https://dev59.com/-1kS5IYBdhLWcg3w1JlK - dnickless
上面链接提供的解决方案为您提供了最大程度的灵活性,也可以在未来使用。但是,从这里获取代码会更加清晰:https://github.com/spring-projects/spring-data-mongodb/blob/master/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/aggregation/LookupOperation.java,然后扩展它以支持您需要的内容,然后创建一个拉取请求。 - dnickless
1
谢谢@dnickless,你的建议对我找到最终解决方案有很大帮助。已点赞。 - Always Learning

2

我在使用被接受的答案中解释的方法时,遇到了一些JSON解析例外情况,因此我深入研究了默认的MongoDB Java驱动程序(版本3)Document类来构建聚合查询,并发现任何聚合查询都可以按以下方式构建:

将mongo控制台查询中的每个元素替换为以下内容:

  1. 花括号({) -> new Document()
  2. 参数名称相同
  3. 冒号(:) -> 逗号(,)
  4. 逗号(,) -> .append()
  5. 方括号([) -> Arrays.asList()
  AggregationOperation customLookupOperation = new AggregationOperation() {
                @Override
                public Document toDocument(AggregationOperationContext context) {
                    return new Document(
                            "$lookup",
                            new Document("from", "deliveryZipCodeTiming")
                                    .append("let",new Document("location_id", "$fulfillmentLocationId"))
                                    .append("pipeline", Arrays.<Object> asList(
                                            new Document("$match", new Document("$expr", new Document("$and",
                                                    Arrays.<Object>asList(
                                                            new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
                                                            new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
                                                    )))),
                                            new Document("$project", new Document("_id",0).append("zipCode", 1)
                                                    .append("cutoffTime", 1)
)
                                    ))
                                    .append("as", "deliveryZipCodeTimings")
                    );
                }
            };

最后,您可以在聚合管道中使用聚合操作。
            Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);

0

对于那些找到简单解决方案并不想麻烦使用原始的JSON查询的人,这里有一个包装器:

@RequiredArgsConstructor
public class PipelineLookUpWrapper implements AggregationOperation {

private final LookupOperation lookup;
private final Aggregation pipelineAggregation;

@Override
public Document toDocument(AggregationOperationContext context) {
    return lookup.toDocument(context);
}

@Override
public String getOperator() {
    return lookup.getOperator();
}

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    List<Document> lookUpPipelineStages = lookup.toPipelineStages(context);

    Document lookUp = (Document) lookUpPipelineStages.iterator().next().get(getOperator());
    lookUp.append("pipeline", pipelineAggregation.getPipeline().getOperations()
            .stream()
            .flatMap(operation -> operation.toPipelineStages(context).stream())
            .toList());
    return lookUpPipelineStages;
}
}

使用方法:

var originalLookUp = Aggregation.lookup("from", "localField", "foreignField", "as");
Aggregation pipelineAggregation = Aggregation.newAggregation(Aggregation.match(new Criteria()), Aggregation.skip(1));
AggregationOperation lookUpWithPipeline = new PipelineLookUpWrapper(originalLookUp, pipelineAggregation);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接