5

I have a DataFrame like below. I need to create a new column based on existing columns.

col1 col2
a      1
a      2
b      1
c      1
d      1
d      2

Output Data Frame look like this

col1  col2 col3 col4
a      1   1      2
a      2   1      2
b      1   0      1
c      1   0      1
d      1   1      2
d      2   1      2

The logic I have used to find col3 is if count of col1 > 1 and col4 is max value of col2.

I am familiar with how to do it in sql . But it's hard to find solution with dataframe DSL. Any help would be appreciated. Thanks

4 Answers 4

6

groupBy col1 and aggregate to get count and max. Then you can join it back with original dataframe to get your desired result

val df2 = df1.groupBy("col1").agg(count() as col3, max("col2") as col4) 

val df3 = df1.join(df2, "col1")
Sign up to request clarification or add additional context in comments.

2 Comments

,+1 for join and group concept. Just for clarification col3 is not the sum of col2. It is count of col2. if col2 >1 it should be 1 otherwise it should be zero.Without join is there any way?. When i am using join in huge data I am facing memory error. Thanks
Yes I would also like to know the solution without a join
2

spark df has property called withColumn You can add as many derived columns as you want. But the column is not added to existing DF instead it create a new DF with added column.

e.g. Adding a static date to the data

val myFormattedData = myData.withColumn("batchdate",addBatchDate(myData("batchdate")))
val addBatchDate = udf { (BatchDate: String) => "20160101" }

Comments

2

To add col3 you can use withcolumn + when/otherwise :

val df2 = df.withColumn("col3",when($"col2" > 1, 1).otherwise(0))

To add col4 the groupBy/max + join already mentionned should do the job :

val df3 = df2.join(df.groupBy("col1").max("col2"), "col1")

Comments

2

To achieve this without a join, you need to use count and max as window functions. This requires creating a window using Window and telling count and max t operate over this window.

from pyspark.sql import Window, functions as fn

df = sc.parallelize([
    {'col1': 'a', 'col2': 1},
    {'col1': 'a', 'col2': 2},
    {'col1': 'b', 'col2': 1},
    {'col1': 'c', 'col2': 1},
    {'col1': 'd', 'col2': 1},
    {'col1': 'd', 'col2': 2}
]).toDF()

col1_window = Window.partitionBy('col1')
df = df.withColumn('col3', fn.when(fn.count('col1').over(col1_window) > 1, 1).otherwise(0))
df = df.withColumn('col4', fn.max('col2').over(col1_window))
df.orderBy(['col1', 'col2']).show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.