0

I am new to PySpark and Spark in general. I would like to apply transformation on a given column in the DataFrame, essentially call a function for each value on that specific column.

I have my DataFrame df that looks like this:

df.show()

+------------+--------------------+
|version     |         body       |
+------------+--------------------+
|           1|9gIAAAASAQAEAAAAA...|
|           2|2gIAAAASAQAEAAAAA...|
|           3|3gIAAAASAQAEAAAAA...|
|           1|7gIAKAASAQAEAAAAA...|
+------------+--------------------+

I need to read value of body column for each row where the version is 1 and then decrypt it (I have my own logic/function which takes a string and returns a decrypted string). Finally, write the decrypted values in csv format to a S3 bucket.

def decrypt(encrypted_string: str):
    # code that returns decrypted string

So, When I do following, I get the corresponding filtered values to which I need to apply my decrypt function.

df.where(col('version') =='1')\
     .select(col('body')).show()

+--------------------+
|                body|
+--------------------+
|9gIAAAASAQAEAAAAA...|
|7gIAKAASAQAEAAAAA...|
+--------------------+

However, I am not clear how to do that. I tried to use collect() but then it defeats the purpose of using Spark.

I also tried using .rdd.map as follows but that did not work.

df.where(col('version') =='1')\
     .select(col('body'))\
     .rdd.map(lambda x: decrypt).toDF().show()

OR 

     .rdd.map(decrypt).toDF().show()

Could someone please help with this.

2 Answers 2

1

Please try:

from pyspark.sql.functions import udf
decrypt_udf = udf(decrypt, StringType())
df.where(col('version') =='1').withColumn('body', decrypt_udf('body'))
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for your answer, this is exactly what I tried based on the post I mentioned.
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.
0

Got some clue from this post: Pyspark DataFrame UDF on Text Column. Looks like I can simply get it with following. I was doing it without using udf earlier, so it wasn't working.

dummy_function_udf = udf(decrypt, StringType())

df.where(col('version') == '1')\
   .select(col('body')) \
   .withColumn('decryptedBody', dummy_function_udf('body')) \
   .show() 

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.