1

I have a dataset df_1 that looks like this:

my_id        scope         feat_1     feat_2    value_1    value_2     value_3          date
23784    some_code          Three          A         30         60          60    2022-01-01
23794    some_code          Seven          B         60         40          20    2022-01-01
23774    some_cod1          Three          A         90         40          60    2022-01-02
22784    some_cod1          Three          C         30         10          60    2022-01-01
23564    some_cod2           Four          A         20         40          20    2022-01-05
20784    some_cod3           Five          A         10         70          40    2022-02-08

I need to perform a simple calculation on it, but since it updates quite often, I want to make sure that all the data is there. For that, I have the following guide df_2. version is always increasing and tells me when the newest update happened and I only care about the maximum version for a certain scope and date.

my_id        scope         feat_1     feat_2                   date     version
23784    some_code          Three          A             2022-01-01         600
23794    some_code          Seven          B             2022-01-01         600   
23774    some_cod1          Three          A             2022-01-02         600       
22784    some_cod1          Three          C             2022-01-01         650
23564    some_cod2           Four          A             2022-01-05         650
20784    some_cod3           Five          A             2022-02-08         700
20744    some_cod2           Five          A             2022-01-05         700
20745    some_cod2           Four          C             2022-01-05         700

I want to look at df_2, group by scope and date and get the maximum version, and then see if all my_ids are present in df_1 for this version?

What I did:

df_2 = df_2.groupBy(["date", "scope"])['version'].max()
df = df_1.join(df_2, on = ["my_id"], how = "leftanti")

But I get

TypeError: 'GroupedData' object is not subscriptable

Why is that and is my logic incorrect?

4
  • that syntax is from pandas. pyspark equivalent would be df_2.groupBy(["date", "scope"]).agg(func.max('version').alias('max_version')) where func is the alias to pyspark.sql.functions Commented Aug 12, 2022 at 7:43
  • @samkart And then do the leftanti join like I did in my original post? Commented Aug 12, 2022 at 7:45
  • df_2 won't have my_id field as it was neither in the grouping fields nor in the aggregation fields. so, your join won't work unless you have the key in both dataframes. i think you should calculate the max of version using a window function. that way, the df_2 will have my_id as well as max version for a certain scope and date Commented Aug 12, 2022 at 7:46
  • @samkart Ah right, but then how do I "get back" to it, as in: now I know the maximum versions of interest, I know which my_id are supposed to be present in df_1, how do I do the actual check? I was thinking of just rebuilding df_2 with only the versions of interest and then do the antijoin... Commented Aug 12, 2022 at 7:52

1 Answer 1

2

TypeError: 'GroupedData' object is not subscriptable

You get this error, because you do .groupBy(["date", "scope"])['version'].

.groupBy(["date", "scope"]) returns an object of type GroupedData. With this object you try to do ['version']. GroupedData object does not have a way to "access" elements using this syntax, i.e. no way to "subscribe" to any of its elements. This is why you get such error notice.

You could do what you want using window functions. E.g. calculate max 'version' over every window having the same 'scope' and 'date'. Then, filter on this max value and do the same "leftanti" join that you tried to do.

Inputs:

from pyspark.sql import functions as F, Window as W
df_1 = spark.createDataFrame(
    [(23784, 'some_code', 'Three', 'A', 30, 60, 60, '2022-01-01'),
     (23794, 'some_code', 'Seven', 'B', 60, 40, 20, '2022-01-01'),
     (23774, 'some_cod1', 'Three', 'A', 90, 40, 60, '2022-01-02'),
     (22784, 'some_cod1', 'Three', 'C', 30, 10, 60, '2022-01-01'),
     (23564, 'some_cod2', 'Four', 'A', 20, 40, 20, '2022-01-05'),
     (20784, 'some_cod3', 'Five', 'A', 10, 70, 40, '2022-02-08')],
    ['my_id', 'scope', 'feat_1', 'feat_2', 'value_1', 'value_2', 'value_3', 'date'])
df_2 = spark.createDataFrame(
    [(23784, 'some_code', 'Three', 'A', '2022-01-01', 60),
     (23794, 'some_code', 'Seven', 'B', '2022-01-01', 600),
     (23774, 'some_cod1', 'Three', 'A', '2022-01-02', 600),
     (22784, 'some_cod1', 'Three', 'C', '2022-01-01', 65),
     (23564, 'some_cod2', 'Four', 'A', '2022-01-05', 65),
     (20784, 'some_cod3', 'Five', 'A', '2022-02-08', 70),
     (20744, 'some_cod2', 'Five', 'A', '2022-01-05', 70),
     (20745, 'some_cod2', 'Four', 'C', '2022-01-05', 70)],
    ['my_id', 'scope', 'feat_1', 'feat_2', 'date', 'version'])

Script:

w = W.partitionBy('scope', 'date').orderBy(F.desc('version'))
df_2 = (df_2
    .withColumn('max_version', F.first('version').over(w))
    .filter('version = max_version')
)
df = df_1.join(df_2, on=["my_id"], how="leftanti")

df.show()
# +-----+---------+------+------+-------+-------+-------+----------+
# |my_id|    scope|feat_1|feat_2|value_1|value_2|value_3|      date|
# +-----+---------+------+------+-------+-------+-------+----------+
# |23784|some_code| Three|     A|     30|     60|     60|2022-01-01|
# |23564|some_cod2|  Four|     A|     20|     40|     20|2022-01-05|
# +-----+---------+------+------+-------+-------+-------+----------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.