0

I want to remove the words in main data frame from secondary data frame.

This is the main data frame:

+----------+--------------------+
|  event_dt|           cust_text|
+----------+--------------------+
|2020-09-02|hi fine i want to go|
|2020-09-02|i need  a line hold |
|2020-09-02|i have the  60 packs|
|2020-09-02|hello want you teach|

Below is single-column secondary data frame. The words in the secondary data frame need to be removed from the main data frame in column cust_text wherever the words occur. For example, 'want' will be removed from every row wherever it shows up in the main data frame (in this example will be removed from 1st and 4th row).

+-------+
|column1|
+-------+
|   want|
|because|
|   need|
|  hello|
|      a|
|   have|
|     go|
+-------+

The event_dt column will remain as is and each row will remain as is, only the secondary data frame words are removed from main data frame in the result data frame as shown below

+----------+--------------------+
|  event_dt|           cust_text|
+----------+--------------------+
|2020-09-02|hi fine i to        |
|2020-09-02|i line hold         |
|2020-09-02|i the 60 packs      |
|2020-09-02|you teach           |
+----------+--------------------+

Help is appreciated!!

1
  • can u pls check and let me know the solution worked for you ? Commented Oct 28, 2020 at 9:27

1 Answer 1

1

This should be the working solution for you - Use array_except() in order to eliminate the unwanted strings, however in order to do that, we need to do a little bit of preparation.

Create the DataFrame Here

from pyspark.sql import functions as F
from pyspark.sql import types as T
df = spark.createDataFrame([("2020-09-02","hi fine i want to go"),("2020-09-02","i need  a line hold"), ("2020-09-02", "i have the  60 packs"), ("2020-09-02", "hello want you teach")],[ "col1","col2"])

Make the column as Array for future use

df = df.withColumn("col2", F.split("col2", " "))
df.show(truncate=False)
df_lookup = spark.createDataFrame([(1,"want"),(1,"because"), (1, "need"), (1, "hello"),(1, "a"),(1, "give"), (1, "go")],[ "col1","col2"])
df_lookup.show()

Output

+----------+---------------------------+
|col1      |col2                       |
+----------+---------------------------+
|2020-09-02|[hi, fine, i, want, to, go]|
|2020-09-02|[i, need, , a, line, hold] |
|2020-09-02|[i, have, the, , 60, packs]|
|2020-09-02|[hello, want, you, teach]  |
+----------+---------------------------+

+----+-------+
|col1|   col2|
+----+-------+
|   1|   want|
|   1|because|
|   1|   need|
|   1|  hello|
|   1|      a|
|   1|   give|
|   1|     go|
+----+-------+

Now, just groupBy the lookup dataframe and take all the lookup values in a variable as below

df_lookup_var = df_lookup.groupBy("col1").agg(F.collect_set("col2").alias("col2")).collect()[0][1]
print(df_lookup_var)
x = ",".join(df_lookup_var)
print(x)
df = df.withColumn("filter_col", F.lit(x))
df = df.withColumn("filter_col", F.split("filter_col", ","))
df.show(truncate=False)

This does the trick

df = df.withColumn("ArrayColumn", F.array_except("col2", "filter_col"))
df.show(truncate = False)
+----------+---------------------------+-----------------------------------------+---------------------------+
|col1      |col2                       |filter_col                               |ArrayColumn                |
+----------+---------------------------+-----------------------------------------+---------------------------+
|2020-09-02|[hi, fine, i, want, to, go]|[need, want, a, because, hello, give, go]|[hi, fine, i, to]          |
|2020-09-02|[i, need, , a, line, hold] |[need, want, a, because, hello, give, go]|[i, , line, hold]          |
|2020-09-02|[i, have, the, , 60, packs]|[need, want, a, because, hello, give, go]|[i, have, the, , 60, packs]|
|2020-09-02|[hello, want, you, teach]  |[need, want, a, because, hello, give, go]|[you, teach]               |
+----------+---------------------------+-----------------------------------------+---------------------------+
Sign up to request clarification or add additional context in comments.

3 Comments

Fantastic!! It worked. I added one thing because the list of secondary dataframe is more than 7000 words. from pyspark.sql.functions import lit df_lookup = df.withColumn('col1', lit('1')) df.show(5)
array_except() removes the duplicates from within the source. I don't want that.
@ dsk, array_except() removes the duplicates of within the source. I don't want that. Any idea how to fix that?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.