0

I am new to PySpark and my objective is to use PySpark script in AWS Glue for:

  1. reading a dataframe from input file in Glue => done
  2. changing columns of some rows which satisfy a condition => facing issue
  3. write the updated dataframe on the same schema into S3 => done

The task seems very simple, but I could not find a way to complete it and still facing different different issues with my changing code.

Till now, my code looks like this:

Transform2.printSchema() # input schema after reading 
Transform2 = Transform2.toDF()
def updateRow(row):
    # my logic to update row based on a global condition 
    #if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
    return row

LocalTransform3 = [] # creating new dataframe from Transform2 
for row in Transform2.rdd.collect():
    row = row.asDict()
    row = updateRow(row)
    LocalTransform3.append(row)
print(len(LocalTransform3))

columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)

Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count()) 

I tried using multiple things like:

  • using map to update existing rows in a lambda
  • using collect()
  • using createDataFrame() to create new dataframe

But faced errors in below steps:

  • not able to create new updated rdd
  • not able to create new dataframe from rdd using existing columns

Some errors in Glue I got, at different stages:

  • ValueError: Some of types cannot be determined after inferring
  • ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
  • An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. Traceback (most recent call last):

Any working code snippet or help is appreciated.

6
  • What is the transformation you want to do on your rows? Commented Jan 27, 2022 at 16:51
  • if row["primaryKey"]=="knownKey": row["otherAttribute"]= None Commented Jan 27, 2022 at 17:01
  • And which column should holy "knownKey" / None? Commented Jan 27, 2022 at 17:02
  • my logic is: for some input primary keys, make other column as None for rows of these primary keys Commented Jan 27, 2022 at 17:04
  • So everything should be None if a row has a knownKey in the Primary Key column? Commented Jan 27, 2022 at 17:20

1 Answer 1

1
from pyspark.sql.functions import col, lit, when

Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))

This should work for your use-case.

Sign up to request clarification or add additional context in comments.

3 Comments

Hi Robert, my one usecase is that the 'otherAttribute' is an attribute inside an array attribute 'arrayAttribute'. Now I want to make 'otherAttribute' as None for all items of 'arrayAttribute'. Is there any way to do it the way you have suggested ?
This is a different question. If this helped you please upvote & accepted and open up another question.
sure. your answer provided me a way to tackle it

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.