1

I want to perform an regexp_replace operation on a pyspark dataframe column using dictionary.

Dictionary : {'RD':'ROAD','DR':'DRIVE','AVE':'AVENUE',....} The dictionary will have around 270 key value pair.

Input Dataframe:

ID  | Address    
1   | 22, COLLINS RD     
2   | 11, HEMINGWAY DR    
3   | AVIATOR BUILDING    
4   | 33, PARK AVE MULLOHAND DR

Desired Output Dataframe:

ID   | Address  | Address_Clean    
1    | 22, COLLINS RD    | 22, COLLINS ROAD    
2    | 11, HEMINGWAY DR     | 11, HEMINGWAY DRIVE    
3    | AVIATOR BUILDING      | AVIATOR BUILDING    
4    | 33, PARK AVE MULLOHAND DR    | 33, PARK AVENUE MULLOHAND DRIVE

I cannot find any documentation on internet. And if trying to pass dictionary as below codes-

data=data.withColumn('Address_Clean',regexp_replace('Address',dict))

Throws an error "regexp_replace takes 3 arguments, 2 given".

Dataset will be around 20 million in size. Hence, UDF solution will be slow (due to row wise operation) and we don't have access to spark 2.3.0 which supports pandas_udf. Is there any efficient method of doing it other than may be using a loop?

3
  • Try the solution here. Commented May 8, 2018 at 15:23
  • Thanks. But if i understand it correctly this solution is basically using a loop to replace key-value pair. This causes scaling problem as mentioned in the link at the bottom comment. is there no other approach that doesnt require looping as it will slow down process for a dataset of size 20 million row Commented May 8, 2018 at 16:00
  • If you read the explanation in the linked solution you will see that spark is not actually looping. Also in the other user's case, the scaling issue came because he was trying to do 10000 replacements- the trouble is the number of replacements, not the number of rows. Your 270 replacements may work, but if not you can try to cache the DataFrame every N (say 10) replacements. The only other approach may be to partition/filter your dataframe and only apply a subset of the replacements. (ie filter for contains "RD" etc). Commented May 8, 2018 at 16:24

1 Answer 1

2

It is trowing you this error because regexp_replace() needs three arguments:

regexp_replace('column_to_change','pattern_to_be_changed','new_pattern')

But you are right, you don't need a UDF or a loop here. You just need some more regexp and a directory table that looks exactly like your original directory :)

Here is my solution for this:

# You need to get rid of all the things you want to replace. 
# You can use the OR (|) operator for that. 
# You could probably automate that and pass it a string that looks like that instead but I will leave that for you to decide.

input_df = input_df.withColumn('start_address', sf.regexp_replace("original_address","RD|DR|etc...",""))


# You will still need the old ends in a separate column
# This way you have something to join on your directory table.

input_df = input_df.withColumn('end_of_address',sf.regexp_extract('original_address',"(.*) (.*)", 2))


# Now we join the directory table that has two columns - ends you want to replace and ends you want to have instead.

input_df = directory_df.join(input_df,'end_of_address')


# And now you just need to concatenate the address with the correct ending.

input_df = input_df.withColumn('address_clean',sf.concat('start_address','correct_end'))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.