3

I want to update partially all objects that are based on aggregation result.

Here is my object:

{
    "name": "name",
    "identificationHash": "aslkdakldjka",
    "isDupe": false,
    ...
}

My goal is to set isDupe to "true" for all documents where "identificationHash" is there more than 2 times.

Currently what I'm doing is:

  1. I get all the documents that "isDupe" = false with a Term aggregation on "identificationHash" for a min_doc_count of 2.
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "isDupe": {
              "value": false,
              "boost": 1
            }
          }
        }
      ]
    }
  },
  "aggregations": {
    "identificationHashCount": {
      "terms": {
        "field": "identificationHash",
        "size": 10000,
        "min_doc_count": 2
      }
    }
  }
}
  1. With the aggregation result, I do a bulk update with a script where "ctx._source.isDupe=true" for all identificationHash that match my aggregation result.

I repeat step 1 and 2 until there is no more result from the aggregation query.

My question is: Is there a better solution to that problem? Can I do the same thing with one script query without looping with batch of 1000 identification hash?

4
  • When you say _looping _ - how are you doing it? Are you using update_by_query? and are both of them different queries. One to aggregate and one to update? Commented Jun 25, 2019 at 20:49
  • Yes it's two query. One to aggregate and one to update. I repeat those two queries until the aggregation return 0 results. Commented Jun 25, 2019 at 20:53
  • I have a solution in mind that would still require two steps (identification and then update), but no need to iterate over multiple batches to achieve what you expect. Commented Nov 25, 2021 at 15:42
  • I got something working, but I'm curious how many documents need to be updated usually (approx.)? Commented Nov 25, 2021 at 16:13

1 Answer 1

1
+50

There's no solution that I know of that allows you to do this in on shot. However, there's a way to do it in two steps, without having to iterate over several batches of hashes.

The idea is to first identify all the hashes to be updated using a feature called Transforms, which is nothing else than a feature that leverages aggregations and builds a new index out of the aggregation results.

Once that new index has been created by your transform, you can use it as a terms lookup mechanism to run your update by query and update the isDupe boolean for all documents having a matching hash.

So, first, we want to create a transform that will create a new index featuring documents containing all duplicate hashes that need to be updated. This is achieved using a scripted_metric aggregation whose job is to identify all hashes occurring at least twice and for which isDupe: false. We're also aggregating by week, so for each week, there's going to be a document containing all duplicates hashes for that week.

PUT _transform/dup-transform
{
  "source": {
    "index": "test-index",
    "query": {
      "term": {
        "isDupe": "false"
      }
    }
  },
  "dest": {
    "index": "test-dups",
    "pipeline": "set-id"
  },
  "pivot": {
    "group_by": {
      "week": {
        "date_histogram": {
          "field": "lastModifiedDate",
          "calendar_interval": "week"
        }
      }
    },
    "aggregations": {
      "dups": {
        "scripted_metric": {
          "init_script": """
            state.week = -1;
            state.hashes = [:];
          """,
          "map_script": """
            // gather all hashes from each shard and count them
            def hash = doc['identificationHash.keyword'].value;

            // set week
            state.week = doc['lastModifiedDate'].value.get(IsoFields.WEEK_OF_WEEK_BASED_YEAR).toString();

            // initialize hashes
            if (!state.hashes.containsKey(hash)) {
             state.hashes[hash] = 0;
            }
            // increment hash
            state.hashes[hash] += 1;
          """,
          "combine_script": "return state",
          "reduce_script": """
            def hashes = [:];
            def week = -1;
            // group the hash counts from each shard and add them up
            for (state in states) {
              if (state == null) return null;
              week = state.week;
              for (hash in state.hashes.keySet()) {
                if (!hashes.containsKey(hash)) {
                  hashes[hash] = 0;
                }
                hashes[hash] += state.hashes[hash];
              }
            }
          
            // only return the hashes occurring at least twice
            return [
              'week': week,
              'hashes': hashes.keySet().stream().filter(hash -> hashes[hash] >= 2)
                     .collect(Collectors.toList())
            ]
          """
        }
      }
    }
  }
}

Before running the transform, we need to create the set-id pipeline (referenced in the dest section of the transform) that will define the ID of the target document that is going to contain the hashes so that we can reference it in the terms query for updating documents:

PUT _ingest/pipeline/set-id
{
  "processors": [
    {
      "set": {
        "field": "_id",
        "value": "{{dups.week}}"
      }
    }
  ]
}

We're now ready to start the transform to generate the list of hashes to update and it's as simple as running this:

POST _transform/dup-transform/_start

When it has run, the destination index test-dups will contain one document that looks like this:

  {
    "_index" : "test-dups",
    "_type" : "_doc",
    "_id" : "44",
    "_score" : 1.0,
    "_source" : {
      "week" : "2021-11-01T00:00:00.000Z",
      "dups" : {
        "week" : "44",
        "hashes" : [
          "12345"
        ]
      }
    }
  },

Finally, we can run the update by query as follows (add as many terms queries as weekly documents in the target index):

POST test/_update_by_query
{
  "query": {
    "bool": {
      "minimum_should_match": 1,
      "should": [
        {
          "terms": {
            "identificationHash": {
              "index": "test-dups",
              "id": "44",
              "path": "dups.hashes"
            }
          }
        },
        {
          "terms": {
            "identificationHash": {
              "index": "test-dups",
              "id": "45",
              "path": "dups.hashes"
            }
          }
        }
      ]
    }
  },
  "script": {
    "source": "ctx._source.isDupe = true;"
  }
}

That's it in two simple steps!! Try it out and let me know.

Sign up to request clarification or add additional context in comments.

13 Comments

It seems to work with a subset of 75k dupes on a total of 180k documents. I will have to test it with a bigger set of data. 50 million dupe on a total of 300 million documents.
Yes, it works well on data sets below 1M docs. With 50 million dups, we might have to amend the process a little bit... Is there any date field or another low cardinality enum in your documents that would allow to better partition the data?
Yes the lastModifiedDate: dateField could be use to do that. And also, the identificationHash is partly build with the lastModifiedDate.
So what we could do is to use lastModifiedDate as pivot and then we'd have smaller sets of documents to work on, which in turn would mean more than one documents in the destination index. I'll improve the answer along those lines
Thinking about this, using the date is not ideal since I guess that two duplicate documents with the same hash might end up in two completely different date ranges, and hence, would not be identified as dups. Do you have another field that could be used as pivot with the following characteristics: 1) 2+ duplicate documents with same hash MUST have the same value for that field (so they end up in the same group) and 2) there are less than 1M documents when grouping by that field (hard limit per shard for scripted_metric)?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.