I have a source pyspark dataframe as per below structure:
| A | B | C | D | E | F | G |
|---|---|---|---|---|---|---|
| 145 | 589 | 1 | 1 | 12 | 25 | |
| 145 | 589 | 1 | 2 | 1ad34 | ||
| 145 | 589 | 1 | 3 | 257 | 18 | 55 |
| 145 | 589 | 2 | 1 | 12 | 25 | |
| 145 | 589 | 2 | 2 | 22 | 45 | |
| 145 | 589 | 2 | 3 | |||
| 145 | 589 | 3 | 1 | 32 | 55 | |
| 145 | 589 | 3 | 2 |
Table Overview:
- Combination of A and B column will have indexed C column. For each indexed C column, we will have D column. Concatenation of A|B|C|D identifies a unique record.
- For the below complete, dataframe check if E column is set at any point of dataframe records traversal. If yes, return the first numeric value (eg 257 should result and 1ad34 should be ignored ) This will be priority 1 operation.
- If column E is never set, return the concatenation of F and G for the very last row combination. If 257 would have never been set on column E, then return 3255 based on 145|589|3|1.
Test Case 1 : Priority column E contains few values. The very first numeric one is 257. So our output should be 257 for 145|589.
Test Case 2 : Priority column E is completely empty, then pick up the most last concatenated value of F and G column which should result as 3255 for 145|589
I have implemented a pyspark code for this as below:
def get_resulting_id(grouped_A_B_df):
try :
out=''
first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
if ( first_E_val_df):
return first_E_val_df["E"]
unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
for uniq in unique_C :
for row in uniq.rdd.toLocalIterator():
out=str(row['F'])+str(row['G'])
except:
raise Exception("Func failed")
return out
Since the source dataframe is having 20 million records, i do not want to use localiterator in priority2 condition, any possible way to speed up the operation. The source dataframe partitioned by combination of column A and B will give the subset dataframe. I want my custom function to be applied on that subset dataframe and return the result for each subset dataframe.
F.first("E", ignorenulls=True)withF.first(F.col("E").cast("int").cast("str"), ignorenulls=True). But again, it is not in your sample data, so please update your example with Minimal, Reproductible example.