3

There are variations of this in the net but not quite what I am expecting. I have a dataframe like so:

     +------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL  |New_LL   |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160   |0           |1              |0               |26.1184 |23.2954  |
|790026|13509  |0           |0              |1               |Infinity|-Infinity|
|790026|9162   |0           |0              |0               |25.03535|23.48585 |
|790026|13510  |0           |0              |1               |Infinity|-Infinity|
|790048|9162   |0           |0              |0               |33.5    |30.5     |
|790048|13509  |0           |0              |1               |Infinity|-Infinity|
|790048|13510  |0           |0              |0               |NaN     |NaN      |
|790048|9160   |0           |1              |0               |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+

I want to replace the New_UL and New_LL values where the use_golden_limit is 1 with values where the is_golden_limit is 1 for each SEQ_ID. So, in this case, the expected result would be:

 +------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|New_UL  |New_LL   |
+------+-------+------------+---------------+----------------+--------+---------+
|790026|9160   |0           |1              |0               |26.1184 |23.2954  |
|790026|13509  |0           |0              |1               |26.1184 |23.2954  |
|790026|9162   |0           |0              |0               |25.03535|23.48585 |
|790026|13510  |0           |0              |1               |26.1184 |23.2954  |
|790048|9162   |0           |0              |0               |33.5    |30.5     |
|790048|13509  |0           |0              |1               |33.94075|30.75925 |
|790048|13510  |0           |0              |0               |NaN     |NaN      |
|790048|9160   |0           |1              |0               |33.94075|30.75925 |
+------+-------+------------+---------------+----------------+--------+---------+

Is this possible?

3
  • 1
    Is it expected to have more than one row with "is_golden_limit" in "1"? Commented Feb 5, 2020 at 1:07
  • @Mitodina, Ideally is_golden_limit= 1 should not have more than one row. I have code to identify such cases to take care of them separately. However, that's a good question. In case it does have more than one row where it is =1, does it take the first value? Commented Feb 5, 2020 at 15:56
  • @thentangler do check my solution please Commented Feb 5, 2020 at 23:42

1 Answer 1

1

As requested, it will only take the first value of is_golden_limit for each ID.

Creating your dataframe

from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.types import *
import numpy as np

list=[[790026,9160,0,1,0,26.1184,23.2954],
[790026,13509,0,0,1,np.inf,-np.inf],
[790026,9162,0,0,0,25.03535,23.48585],
[790026,13510,0,0,1,np.inf,-np.inf],
[790048,9162,0,0,0,33.5,30.5],
[790048,13509,0,0,1,np.inf,-np.inf],
[790048,13510,0,0,0,np.NaN,np.NaN],
[790048,9160,0,1,0,33.94075,30.75925 ]]

df= spark.createDataFrame(list,['SEQ_ID','TOOL_ID','isfleetlevel','is_golden_limit','use_golden_limit','New_UL','New_LL'])

+------+-------+------------+---------------+----------------+--------+---------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|  New_UL|   New_LL|
+------+-------+------------+---------------+----------------+--------+---------+
|790026|   9160|           0|              1|               0| 26.1184|  23.2954|
|790026|  13509|           0|              0|               1|Infinity|-Infinity|
|790026|   9162|           0|              0|               0|25.03535| 23.48585|
|790026|  13510|           0|              0|               1|Infinity|-Infinity|
|790048|   9162|           0|              0|               0|    33.5|     30.5|
|790048|  13509|           0|              0|               1|Infinity|-Infinity|
|790048|  13510|           0|              0|               0|     NaN|      NaN|
|790048|   9160|           0|              1|               0|33.94075| 30.75925|
+------+-------+------------+---------------+----------------+--------+---------+

Selecting new dataframe to be used for self-join

And taking first appearance of is_golden_limit value for each ID

w=Window().partitionBy("SEQ_ID").orderBy("SEQ_ID")
df1=df.select(F.col("is_golden_limit").alias("use_golden_limit"),F.col("New_UL").alias("New_UL1"),F.col("New_LL").alias("New_LL1"),"SEQ_ID").filter(F.col("is_golden_limit")==1).withColumn('row_num',F.row_number().over(w)).filter(F.col("row_num")==1).drop("row_num")

+----------------+--------+--------+------+
|use_golden_limit| New_UL1| New_LL1|SEQ_ID|
+----------------+--------+--------+------+
|               1| 26.1184| 23.2954|790026|
|               1|33.94075|30.75925|790048|
+----------------+--------+--------+------+

Joining and Creating new columns with the condition

df1 will naturally be a much smaller dataframe, therefore, it is best practice to use broadcast join(broadcast small dataframe to all nodes for better co-location in join).

df2=df.join(df1.hint("broadcast"), on=['use_golden_limit','SEQ_ID'], how='left')
df3=df2.withColumn("New_UL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_UL1")).otherwise(F.col("New_UL")))\
   .withColumn("New_LL_Final", F.when((F.col("use_golden_limit")==1),F.col("New_LL1")).otherwise(F.col("New_LL")))\
   .orderBy("SEQ_ID").drop("New_UL","New_LL","New_LL1","New_UL1")

Selecting Final dataframe and .show()

df4=df3.select("SEQ_ID","TOOL_ID","isfleetlevel","is_golden_limit","use_golden_limit",F.col("New_UL_Final").alias("New_UL"),
          F.col("New_LL_Final").alias("New_LL"))
df4.show()

Final dataframe:

+------+-------+------------+---------------+----------------+--------+--------+
|SEQ_ID|TOOL_ID|isfleetlevel|is_golden_limit|use_golden_limit|  New_UL|  New_LL|
+------+-------+------------+---------------+----------------+--------+--------+
|790026|  13510|           0|              0|               1| 26.1184| 23.2954|
|790026|   9162|           0|              0|               0|25.03535|23.48585|
|790026|  13509|           0|              0|               1| 26.1184| 23.2954|
|790026|   9160|           0|              1|               0| 26.1184| 23.2954|
|790048|  13509|           0|              0|               1|33.94075|30.75925|
|790048|   9160|           0|              1|               0|33.94075|30.75925|
|790048|   9162|           0|              0|               0|    33.5|    30.5|
|790048|  13510|           0|              0|               0|     NaN|     NaN|
+------+-------+------------+---------------+----------------+--------+--------+
Sign up to request clarification or add additional context in comments.

4 Comments

Thank you! using the isgoldenlimit as usegoldenlimit was a nice succinct approach to the solution. I have a concern: joins are expensive in the cloud and the broadcast documentation mentions that the max table size for the join is 10MB. Would this approach work for dataframes of 1B rows or more?
You are right default is 10MB, however, recently Spark has increased the maximum size for the broadcast table from 2GB to 8GB( if you are using latest version). You can set spark.sql.autoBroadcastJoinThreshold to up to 8 gigs.
Source code for 8GB broadcast: github.com/apache/spark/blob/… Link for optimizing joins : medium.com/datakaresolutions/…
In your use case, even if you have over a billion rows, the df1(smaller dataframe with only isgoldenlimit==1) should by no means exceed 8GB(also because it only has 3 columns). The broadcast join with the big table will be highly optimized.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.