Spring Data MongoDB Lookup with Pipeline Aggregation
Asked Answered
S

5

18

How would I convert the following MongoDB query into a query to be used by my Java Spring application? I can't find a way to use pipeline with the provided lookup method.

Here is the query I am attempting to convert. I also want to note that I didn't use $unwind as I wanted the deliveryZipCodeTimings to stay as a grouped collection in the return object.

db.getCollection('fulfillmentChannel').aggregate([
    {
        $match: {
            "dayOfWeek": "SOME_VARIABLE_STRING_1"
        }
    },
    {
        $lookup: {
            from: "deliveryZipCodeTiming",
            let: { location_id: "$fulfillmentLocationId" },
            pipeline: [{
                $match: {
                    $expr: {
                        $and: [
                            {$eq: ["$fulfillmentLocationId", "$$location_id"]},
                            {$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
                        ]
                    }
                }
            },
            { 
                $project: { _id: 0, zipCode: 1, cutoffTime: 1 } 
            }],
            as: "deliveryZipCodeTimings"
        }
    },
    {
        $match: {
            "deliveryZipCodeTimings": {$ne: []}
        }
    }
])
Selfconfessed answered 29/6, 2018 at 18:36 Comment(0)
S
17

Building upon the info given by @dnickless, I was able to solve this. I'll post the complete solution in the hopes it helps someone else in the future.

I'm using mongodb-driver:3.6.4

First, I had to create a custom aggregation operation class so that I could pass in a custom JSON mongodb query to be used in the aggregation operation. This will allow me to use pipeline within a $lookup which is not supported with the driver version I am using.

public class CustomProjectAggregationOperation implements AggregationOperation {
    private String jsonOperation;

    public CustomProjectAggregationOperation(String jsonOperation) {
        this.jsonOperation = jsonOperation;
    }

    @Override
    public Document toDocument(AggregationOperationContext aggregationOperationContext) {
        return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
    }
}

Now that we have the ability to pass a custom JSON query into our mongodb spring implementation, all that is left is to plug those values into a TypedAggregation query.

public List<FulfillmentChannel> getFulfillmentChannels(
    String SOME_VARIABLE_STRING_1, 
    String SOME_VARIABLE_STRING_2) {

    AggregationOperation match = Aggregation.match(
            Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
    AggregationOperation match2 = Aggregation.match(
            Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
    String query =
            "{ $lookup: { " +
                    "from: 'deliveryZipCodeTiming'," +
                    "let: { location_id: '$fulfillmentLocationId' }," +
                    "pipeline: [{" +
                    "$match: {$expr: {$and: [" +
                    "{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
                    "{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
                    "{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
                    "as: 'deliveryZipCodeTimings'}}";

    TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
            FulfillmentChannel.class,
            match,
            new CustomProjectAggregationOperation(query),
            match2
    );

    AggregationResults<FulfillmentChannel> results = 
        mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
    return results.getMappedResults();
}
Selfconfessed answered 5/7, 2018 at 21:13 Comment(4)
Great Answer! This helped a lot with a query I was working. I did find a slight issue when using dates as part of the match. When using dates in either the let or in the match directly the CustomProjectAggregationOperation's constructor should just accept a Document instead of a String it will need to parse later. The parsing of the string seems to mess up the dates resulting in either bad results or no results at all.Smetana
@AlwaysLearning: I tried using TypedAggregation, however, my AggregationResult is a different POJO then my entity. I get a PropertyReferenceException saying the lookup id doesnt exist on the entity.Dragon
I am not sure why no one is pointing to "ExposedFields", When Spring data for mongo parses each step in the pipeline it looks for exposed fields from the previous step, in case of custom Aggregation it was failing for me with error java.lang.IllegalArgumentException: Invalid reference. I had to implement FieldsExposingAggregationOperation and FieldsExposingAggregationOperation.InheritsFieldsAggregationOperation and provide implementation of getFields method. @Override public ExposedFields getFields() { return ExposedFields.synthetic(Fields.fields(this.exposedField)); }Apteral
What about the new field created as : deliveryZipCodeTimings after look up. I am trying to access and sort on the new field exposed but it is giving Invalid reference. can someone please help?Gradatim
V
5

I would like to add this my solution which is repeating in some aspect the solutions posted before.

Mongo driver v3.x

For Mongo driver v3.x I came to the following solution:

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    public JsonOperation(String json) {
        Object root = JSON.parse(json);

        documents = root instanceof BasicDBObject
                    ? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
                    : ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }
}

and then provided that aggregation steps are given in some resource aggregations.json:

[
  {
    $match: {
      "userId": "..."
    }
  },
  {
    $lookup: {
      let: {
        ...
      },
      from: "another_collection",
      pipeline: [
        ...
      ],
      as: "things"
    }
  },
  {
    $sort: {
      "date": 1
    }
  }
]

one can use above class as follows:

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;

Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();

Mongo driver v4.x

As JSON class was removed from Mongo v4, I have rewritten the class as follows:

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    private static final String DUMMY_KEY = "dummy";

    public JsonOperation(String json) {
        documents = parseJson(json);
    }

    static final List<Document> parseJson(String json) {
        return (json.startsWith("["))
                    ? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
                    : Collections.singletonList(Document.parse(json));
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }

    @Override
    public String getOperator() {
        return documents.iterator().next().keySet().iterator().next();
    }
}

but implementation is now a bit ugly because of string manipulations. If somebody has a better idea of how to parse array of objects in a more elegant way, please edit this post or drop a comment. Ideally there should be some method in Mongo core that allows to parse either JSON object or list (returns BasicDBObject/BasicDBList or Document/List<Document>).

Also note that I have skipped the step of transforming Document instances in toPipelineStages() method as it is not necessary in my case:

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}

Vanderbilt answered 28/8, 2020 at 17:16 Comment(0)
A
4

The drivers are pretty much always a little bit behind the current language features that MongoDB provides - hence some of the latest and greatest features are simply not nicely accessible through the API yet. I am afraid this is one of those cases and you'll need to resort to using strings. Kind of like so (untested):

AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);
Archive answered 29/6, 2018 at 20:48 Comment(5)
Thanks @Archive however I don't see any Aggregation.newAggregation that takes a DBObject as a parameter. Do you know how I could go about passing DBObject into Aggregation.newAggregation?Selfconfessed
@AlwaysLearning: You are right... Let me google a bit... Without Spring it would be possible like this: #36352610Archive
I think you can do it this way (by implementing your own JSON string based AggregationOperation class): #39394172Archive
The solution provided in the above link gives you the greatest level of flexibility also going forward. It would, however, be cleaner to grab the code from this class here: github.com/spring-projects/spring-data-mongodb/blob/master/…, then extend it to make it support what you need and then create a pull request.Archive
Thanks @dnickless, your input helped me towards the final solution. Upvoted.Selfconfessed
B
3

I faced some JSON parsing exceptions when I used the way explained in the accepted answer, so I dig deep the default MongoDB java driver(version 3) Document class to build up aggregation query and found out any aggregation query can be build u as follows,

Replace each of the element in the mongo console query as follows

  1. Curly braces({) -> new Document()
  2. parameter names are same
  3. Colon(:) -> Coma(,)
  4. Coma(,) -> .append()
  5. Square bracket([) -> Arrays.asList()
  AggregationOperation customLookupOperation = new AggregationOperation() {
                @Override
                public Document toDocument(AggregationOperationContext context) {
                    return new Document(
                            "$lookup",
                            new Document("from", "deliveryZipCodeTiming")
                                    .append("let",new Document("location_id", "$fulfillmentLocationId"))
                                    .append("pipeline", Arrays.<Object> asList(
                                            new Document("$match", new Document("$expr", new Document("$and",
                                                    Arrays.<Object>asList(
                                                            new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
                                                            new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
                                                    )))),
                                            new Document("$project", new Document("_id",0).append("zipCode", 1)
                                                    .append("cutoffTime", 1)
)
                                    ))
                                    .append("as", "deliveryZipCodeTimings")
                    );
                }
            };

Finally you can use the aggregation operation in the aggrgation pipeline,

            Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);
Brinkley answered 6/7, 2021 at 12:53 Comment(0)
E
0

For anyone who finds a simple solution and don't want to bother with raw json queries here is wrapper:

@RequiredArgsConstructor
public class PipelineLookUpWrapper implements AggregationOperation {

private final LookupOperation lookup;
private final Aggregation pipelineAggregation;

@Override
public Document toDocument(AggregationOperationContext context) {
    return lookup.toDocument(context);
}

@Override
public String getOperator() {
    return lookup.getOperator();
}

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    List<Document> lookUpPipelineStages = lookup.toPipelineStages(context);

    Document lookUp = (Document) lookUpPipelineStages.iterator().next().get(getOperator());
    lookUp.append("pipeline", pipelineAggregation.getPipeline().getOperations()
            .stream()
            .flatMap(operation -> operation.toPipelineStages(context).stream())
            .toList());
    return lookUpPipelineStages;
}
}

Usage:

var originalLookUp = Aggregation.lookup("from", "localField", "foreignField", "as");
Aggregation pipelineAggregation = Aggregation.newAggregation(Aggregation.match(new Criteria()), Aggregation.skip(1));
AggregationOperation lookUpWithPipeline = new PipelineLookUpWrapper(originalLookUp, pipelineAggregation);
Eyepiece answered 3/11, 2022 at 14:15 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.