0

I have the following spark dataframe:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('').getOrCreate()
df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])

which returns me:

+---+-------+------+
| nr|column2|quant |
+---+-------+------+
|  1|      a|     2|
|  2|      b|     2|
|  3|      c|     2|
|  4|      d|     2|
|  5|      b|     3|
|  6|      b|     3|
|  7|      c|     2|
+---+-------+------+

I would like to retrieve the rows where for each 3 groupped rows (from each window where window size is 3) quant column has unique values. as in the following pic:

enter image description here

Here red is window size and each window i keep only green rows where quant is unique:

The ouptput that i would like to get is as following:

+---+-------+------+
| nr|column2|values|
+---+-------+------+
|  1|      a|     2|
|  4|      d|     2|
|  5|      b|     3|
|  7|      c|     2|
+---+-------+------+

I am new in spark so, I would appreciate any help. Thanks

7
  • 2
    how do you create your red group ? what is your grouping condition ? Commented Apr 3, 2019 at 9:34
  • just first 3 row is one group, and next 3 rows are the next group Commented Apr 3, 2019 at 11:13
  • first ordered by what ? Commented Apr 3, 2019 at 13:08
  • from the top. the dataset is already sorted. just first top 3 rows is one group, then next 3 row is next group. it should roll Commented Apr 3, 2019 at 14:04
  • nothing is already sorted on a distributed file system or in a database. it is like a bag of marbles, you can sort them out of the bag, but not inside. Commented Apr 3, 2019 at 14:20

1 Answer 1

2

This approach should work for you, assuming grouping 3 records are based on 'nr' column.

Using udf, which decides whether a record should be selected or not and lag, is used to get prev rows data.

def tag_selected(index, current_quant, prev_quant1, prev_quant2):                                                                                                    
    if index % 3 == 1:  # first record in each group is always selected                                                                                              
        return True                                                                                                                                                  
    if index % 3 == 2 and current_quant != prev_quant1: # second record will be selected if prev quant is not same as current                                        
        return True                                                                                                                                                  
    if index % 3 == 0 and current_quant != prev_quant1 and current_quant != prev_quant2: # third record will be selected if prev quant are not same as current       
        return True                                                                                                                                                  
    return False                                                                                                                                                     

tag_selected_udf = udf(tag_selected, BooleanType())                                                                                                                  

df = spark.createDataFrame([(1, "a", "2"), (2, "b", "2"),(3, "c", "2"), (4, "d", "2"),
                (5, "b", "3"), (6, "b", "3"),(7, "c", "2")], ["nr", "column2", "quant"])

window = Window.orderBy("nr")

df = df.withColumn("prev_quant1", lag(col("quant"),1, None).over(window))\
       .withColumn("prev_quant2", lag(col("quant"),2, None).over(window)) \
       .withColumn("selected", 
                   tag_selected_udf(col('nr'),col('quant'),col('prev_quant1'),col('prev_quant2')))\
       .filter(col('selected') == True).drop("prev_quant1","prev_quant2","selected")
df.show()

which results

+---+-------+-----+
| nr|column2|quant|
+---+-------+-----+
|  1|      a|    2|
|  4|      d|    2|
|  5|      b|    3|
|  7|      c|    2|
+---+-------+-----+
Sign up to request clarification or add additional context in comments.

5 Comments

@Sascha, If it is working for you, can you please accept this as answer, Thanks.
First of all thanks for helping, i have question regarding this code, so in df.withColumn("prev_quant1", lag(col("quant"),1, None).over(window)).... everytime we create new column? it means in this case it will take too much memory if data is bigger? and second question isa about def tag_selected. Here does it mean that it works like rolling window but not taking group of 3 rows(1,2,3) and the next 3 rows(4,5,6) as a different group? because when i run this i got an output with 1,4,5,6,7. and is it possible to define window size as 3?
Transformations on rdd/df/datasets will create new rdds as rdd immutable. So adding new col creates new rdd instances, but spark memory management decides if the old rdds need to retain or clear.tag_selected, takes current,prev quants like a rolling window, but the logic inside ignores other window elements. For ex 4th row, calls udf with 2nd,3rd row data but udf ignores as it knows based on the index it is first row of the group and always needs to select. The code, should n't select 6 as the condition in udf if index % 3 == 0...... fails and returns False.
Thanks, for explanations. i would like to clarify one more thing. What if spark dataframe is sorted by nr already but nr doesnt start from 1. for example, the order of nr isl ike following: "15.3, 22.8, 37.1, 39, 56 and etc". in this case it is better to add new index column and followthat logic? or can there be other option regarding nr?
Yes, if nr values are not index numbers then you need to add new col as index with index numbers.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.