2

I am working with some ElasticSearch data and i would like to generate the tables from the aggregations like in Kibana. A sample output of the aggregation is below, based on the following code :

    s.aggs.bucket("name1", "terms", field="field1").bucket(
        "name2", "terms", field="innerField1"
    ).bucket("name3", "terms", field="InnerAgg1")
     response = s.execute()
   resp_dict = response.aggregations.name.buckets




{
    "key": "Locationx",
    "doc_count": 12,
    "name2": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "Sub-Loc1",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }, {
            "key": "Sub-Loc2",
            "doc_count": 1,
            "name3": {
                "doc_count_error_upper_bound": 0,
                "sum_other_doc_count": 0,
                "buckets": [{
                    "key": "super-Loc1",
                    "doc_count": 1
                }]
            }
        }]
    }
}

In this case, the expected output would be:

Expected Output

Now, I have tried a variety of methods, with a short description of what went wrong :

Pandasticsearch = completely failed even with just 1 dictionary. The dictionary was not created, as it was struggling with keys, even with each dictionary being dealt with separately:

for d in resp_dict :
    x= d.to_dict()
    pandas_df = Select.from_dict(x).to_pandas()
    print(pandas_df)

In particular, the error that was recieved related to the the fact that the dictionary was not made and thus ['took'] was not a key.

Pandas (pd.Dataframe.from_records()) = only gave me the first aggregation, with a column containing the inner dictionary, and using pd.apply(pd.Series) on it gave another table of resulting dictionaries.

StackOverflow posts recursive function = the dictionary looks completely different than the example used,and tinkering led me nowhere unless i drastically change the input.

1 Answer 1

4

Struggling with the same problem, I've come to believe the reason for this being that the response_dict are not normal dicts, but an elasticsearch_dsl.utils.AttrList of elasticsearch_dsl.utils.AttrDict.

If you have an AttrList of AttrDicts, it's possible to do:

resp_dict = response.aggregations.name.buckets
new_response = [i._d_ for i in resp_dict]

To get a list of normal dicts instead. This will probably play nicer with other libraries.

Edit:

I wrote a recursive function which at least handles some cases, not extensively tested yet though and not wrapped in a nice module or anything. It's just a script. The one_lvl function keeps track of all the siblings and siblings of parents in the tree in a dictionary called tmp, and recurses when it finds a new named aggregation. It assumes a lot about the structure of the data, which I'm not sure is warranted in the general case.

The lvl stuff is necessary I think because you might have duplicate names, so key exists at several aggregation-levels for instance.

#!/usr/bin/env python3

from elasticsearch_dsl.query import QueryString
from elasticsearch_dsl import Search, A
from elasticsearch import Elasticsearch
import pandas as pd

PORT = 9250
TIMEOUT = 10000
USR = "someusr"
PW = "somepw"
HOST = "test.com"
INDEX = "my_index"
QUERY = "foobar"

client = Elasticsearch([HOST], port = PORT, http_auth=(USR, PW), timeout = TIMEOUT)

qs = QueryString(query = QUERY)
s = Search(using=client, index=INDEX).query(qs)

s = s.params(size = 0)

agg= {
    "dates" : A("date_histogram", field="date", interval="1M", time_zone="Europe/Berlin"),
    "region" : A("terms", field="region", size=10),
    "county" : A("terms", field="county", size = 10)
}

s.aggs.bucket("dates", agg["dates"]). \
       bucket("region", agg["region"]). \
       bucket("county", agg["county"])

resp = s.execute()

data = {"buckets" : [i._d_ for i in resp.aggregations.dates]}
rec_list = ["buckets"] + [*agg.keys()]

def get_fields(i, lvl):
    return {(k + f"{lvl}"):v for k, v in i.items() if k not in rec_list}

def one_lvl(data, tmp, lvl, rows, maxlvl):
    tmp = {**tmp, **get_fields(data, lvl)}

    if "buckets" not in data:
        rows.append(tmp)

    for d in data:
        if d in ["buckets"]:
            for v, b in enumerate(data[d]):
                tmp = {**tmp, **get_fields(data[d][v], lvl)}
                for k in b:
                    if k in agg.keys():
                        one_lvl(data[d][v][k], tmp, lvl+1, rows, maxlvl)
                    else:
                        if lvl == maxlvl:
                            tmp = {**tmp, (k + f"{lvl}") : data[d][v][k]}
                            rows.append(tmp)

    return rows


rows = one_lvl(data, {}, 1, [], len(agg))
df = pd.DataFrame(rows)

Sign up to request clarification or add additional context in comments.

2 Comments

Honestly the best thing that can be done at this time. I would have loved to find a better way, but turning it into a bunch of dicts and using those is the way I did it
I made an attempt at finding a recursive solution, edited the post.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.