0

I have a source pyspark dataframe as per below structure:

A B C D E F G
145 589 1 1 12 25
145 589 1 2 1ad34
145 589 1 3 257 18 55
145 589 2 1 12 25
145 589 2 2 22 45
145 589 2 3
145 589 3 1 32 55
145 589 3 2

Table Overview:

  1. Combination of A and B column will have indexed C column. For each indexed C column, we will have D column. Concatenation of A|B|C|D identifies a unique record.
  2. For the below complete, dataframe check if E column is set at any point of dataframe records traversal. If yes, return the first numeric value (eg 257 should result and 1ad34 should be ignored ) This will be priority 1 operation.
  3. If column E is never set, return the concatenation of F and G for the very last row combination. If 257 would have never been set on column E, then return 3255 based on 145|589|3|1.

Test Case 1 : Priority column E contains few values. The very first numeric one is 257. So our output should be 257 for 145|589.

Test Case 2 : Priority column E is completely empty, then pick up the most last concatenated value of F and G column which should result as 3255 for 145|589

I have implemented a pyspark code for this as below:

def get_resulting_id(grouped_A_B_df):
    try :
        out=''
        first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
        if ( first_E_val_df):
            return first_E_val_df["E"]
        unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
        for uniq in unique_C :
            for row in uniq.rdd.toLocalIterator():
                out=str(row['F'])+str(row['G'])
    except:
        raise Exception("Func failed")
    return out

Since the source dataframe is having 20 million records, i do not want to use localiterator in priority2 condition, any possible way to speed up the operation. The source dataframe partitioned by combination of column A and B will give the subset dataframe. I want my custom function to be applied on that subset dataframe and return the result for each subset dataframe.

7
  • what is the expected output ? a new column in the current dataframe ? or a local python object i.e. a pandas df or list ? Commented Jul 19, 2021 at 8:37
  • Pandas df will be fine. Commented Jul 19, 2021 at 8:46
  • if you could give a better sample input, with case E and case F+G, that'd be great ! (we really do not need that many lines, just relevant ones) - How to create a Minimal, Reproducible Example Commented Jul 19, 2021 at 9:03
  • Thanks steven! It helps, i was wondering how to get the first numeric value from E column with the coalesce and partitioning. Let's say 145|589|1|2 has value 123abc, then we need to ignore that and get the first numeric value. first(col("E").cast(DecimalType()).isNull(), ignorenulls=True) might not be right here because it will make first expression as boolean and second expression as string, am i correct? Any way out for that? Commented Jul 19, 2021 at 13:17
  • If you only want numeric value, you just have to cast as int and cast it back to string : replace F.first("E", ignorenulls=True) with F.first(F.col("E").cast("int").cast("str"), ignorenulls=True). But again, it is not in your sample data, so please update your example with Minimal, Reproductible example. Commented Jul 19, 2021 at 13:28

1 Answer 1

1

Not sure to understand exactly what is your expected output based on the sample input data you gave. I tried your function, and the output is '257' so here is my full pyspark code that should provide the same output:

from pyspark.sql import functions as F, Window as W

df.select(
    "A",
    "B",
    F.coalesce(
        F.first("E", ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
        F.last(F.concat(F.col("F"), F.col("G")), ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
    ).alias("out"),
).distinct().show()

+---+---+---+                                                                   
|  A|  B|out|
+---+---+---+
|145|589|257|
+---+---+---+

You can replace .show() with .toPandas() if you need a pandas df as output.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.