Mongo Aggregate Pipelines – Multiple Lookups in another Collection

I’m having some trouble understanding aggregate pipelines when doing a seemingly complex match up with another collection. The goal is to get a list of videos that a specific user has no video_impression entry in the analytics collection for.

My data looks something like this:

db={
  "videos": [
    {
      "_id": "1",
      "name": "1's Video",
      "status": "complete",
      "privacy": "public"
    },
    {
      "_id": "2",
      "name": "2's Video",
      "status": "complete",
      "privacy": "public"
    },
    {
      "_id": "3",
      "name": "3's Video",
      "status": "complete",
      "privacy": "public"
    },
    {
      "_id": "4",
      "name": "4's Video",
      "status": "complete",
      "privacy": "private"
    },
    {
      "_id": "5",
      "name": "5's Video",
      "status": "flagged",
      "privacy": "public"
    }
  ],
  "analytics": [
    {
      "_id": "1",
      "user": "1",
      "event": "video_impression",
      "data": {
        "video": "1"
      }
    },
    {
      "_id": "2",
      "user": "2",
      "event": "video_impression",
      "data": {
        "video": "2"
      }
    }
  ]
}

I have managed to get a matcher working but it works “globally” ie. it does not take in to consideration the user id so it’s giving back documents that don’t match anyone.

db.videos.aggregate([
  {
    $match: {
      "status": "complete",
      "privacy": "public"
    }
  },
  {
    $lookup: {
      from: "analytics",
      localField: "_id",
      foreignField: "data.video",
      as: "matched_docs"
    }
  },
  {
    $match: {
      "matched_docs": {
        $eq: []
      }
    }
  }
])

I tried adding another $lookup stage to the pipeline to look up the user field but that didn’t seem to work either as the data was always empty. Here’s a Mongo Playground of the issue I’m having that may help explain it further.

  • 1

    Can you give an example of what you want your documents to look like after the search is complete. I’m just having trouble understanding exactly what you need. You say “The goal is to get a list of videos that a specific user has no video_impression entry in the analytics collection” but all of your analytics documents have a "event": "video_impression".

    – 

  • 1

    Good job on the sample documents, showing what you tried and the Mongo Playground link 👏 Especially for someone new to SO. +1

    – 

  • 2

    @jQueeny Sorry about that! I should have provided a more complete example of the data, there are other events in there as well and the collection is quite large. My goal was an array of video ids that had no video_impression event record for a specific user ie. the specified user had not had any of the resulting videos impressed.

    – 

1. Firstly, this aggregation would be better to run from the analytics collection than the videos collection. Or even better, using the users collection if you have it.

2. As per jQueeny’s comment, the ‘analytics’ collection example is a bit incomplete. I’ll assume each event only exists once so if a user watches two videos, there would be two entries in analytics rather than just one with an array of the videos. PS. I recommend that you change this so that it’s an array of object id’s in data per event-type, with separate records per user or even combining them all into one, depending on how you plan to use it later.

anaylytics collection:

[
  { "_id": "1", "user": "1", "event": "video_impression", "data": { "video": "1" } },
  { "_id": "2", "user": "2", "event": "video_impression", "data": { "video": "2" } },
  { "_id": "3", "user": "2", "event": "video_impression", "data": { "video": "3" } },
  { "_id": "4", "user": "2", "event": "liked_video", "data": { "video": "2" } }
]

3. Method here is to use the pipeline syntax of $lookup to get all the video ids using an Uncorrelated subquery. It has the advantage of only being run once and then using the cache:

MongoDB only needs to run the $lookup subquery once before caching the query because there is no relationship between the source and foreign collections. The $lookup subquery is not based on any value in the source collection. This behavior improves performance for subsequent executions of this query.

However, if the collection of videos is too big and the per-stage document becomes >100 MB, this pipeline will fail and you’ll need to use a correlated subquery.

4. The method used here is:

a) Using the analytics collection, filter to only video_impressions events, group by user_id and then create a set (unique array) of the videos they’ve watched/have an impression.

b) Use a lookup to get all the video ids for “public+complete” vids into one array

c) do a difference between the all videos and videos with impressions.

5. Btw, if you want to do this for only one user at a time, like for a web page/FE, then add user: <user_id> in the first match stage with event.

db.analytics.aggregate([
  {
    // select only the video_impression events
    $match: { event: "video_impression" }
  },
  {
    // first uniquify your users but you should
    // probably run this from the users collection
    $group: {
      _id: "$user",
      impressioned_vids: { "$addToSet": "$data.video" },
      // remove this if you want the user as _id
      user: { "$first": "$user" }
    }
  },
  { $project: { _id: 0 } },
  {
    // uncorrelated subquery which should only run once
    // and then is cached
    $lookup: {
      from: "videos",
      pipeline: [
        {
          $match: {
            status: "complete",
            privacy: "public"
          }
        },
        {
          $group: {
            _id: null,
            video_ids: { $push: "$_id" }
          }
        }
      ],
      as: "all_vids"
    }
  },
  {
    // put it conveniently into a single list
    $set: { all_vids: { $first: "$all_vids.video_ids" } }
  },
  {
    // these are the public-complete videos which that user has not seen
    $set: {
      unimpressed: {
        $setDifference: [ "$all_vids", "$impressioned_vids" ]
      }
    }
  },
  {
    // get rid of the other fields, uncomment to debug
    $project: {
      user: 1,
      unimpressed: 1
    }
  }
])

With my modified analytics collection and your original videos collection, this is the result:

[
  {
    "unimpressed": ["1"],
    "user": "2"
  },
  {
    "unimpressed": ["2", "3"],
    "user": "1"
  }
]

Mongo Playground


Option 2

If the list of all_vids is too big for the uncorrelated $lookup, then it will need to be a correlated $lookup, which executes once per document. The main change is in the $lookup stage, where I check that the video is not in the list of impression/watched videos (or in this case, intersection is empty). This requires assigning the “seen videos” array to a variable in let and then using that in the lookup-pipeline.

  {
    // correlated subquery which executes per record
    $lookup: {
      from: "videos",
      let: {
        seen_vid_ids: "$impressioned_vids"
      },
      pipeline: [
        {
          $match: {
            status: "complete",
            privacy: "public",
            // I wanted to do:
            // _id: { $nin: "$$seen_vid_ids" }  // syntax error
            // or even
            // _id: { $nin: ["$$seen_vid_ids"] }  // doesn't work
            //
            // but instead had to do this
            $expr: {
              $eq: [
                {
                  "$setIntersection": [ ["$_id"], "$$seen_vid_ids" ]
                },
                []
              ]
            }
          }
        }
      ],
      as: "unseen_vids"
    }
  },

Mongo Playground with the full aggregation

Leave a Comment