2

I have a Pyspark dataframe, that needs to be joined with another dataframe, based on a string column. For eg.

(bob1, "a.b.*.c") (bob2, "a.b.c")

when joined with

(tom1, "a.b.d.c") (tom2, "a.b.c")

on the second column (the pattern), should give: (bob1, tom1) (bob2, tom2). I understand this can be done using rlike but for for that I need to transform the pattern column into an actual regex. So

  • a.b.*.c becomes ^a.b.(\w+).c$
  • a.b.c becomes ^a.b.c$

Im having trouble doing this conversion. I tried using regex_replace(), but due to having \ in the output, it inserts \ twice instead of once.

4
  • 1
    In fact, this is an interesting scenario. I guess * may appear everywhere not only in the 3rd position, right? Commented Sep 16, 2020 at 18:28
  • That's correct. Commented Sep 16, 2020 at 18:35
  • Then I don't think there is a different way other than cartesian product. Unless if the datasets are relatively small. Then you can find the patterns from df1 and extract them (df1.collect). Apply the patterns to df2 by replacing \w with * and finally join them with inner join. But that has the drawback that you add complexity and extra action of course. Commented Sep 16, 2020 at 18:49
  • That's pretty smart! I just checked the production data, and in practice all the wildcards are appearing in the second place only. I might just use your approach! Commented Sep 17, 2020 at 9:46

1 Answer 1

2

regexp_replace can be used:

df1 = spark.createDataFrame([
    ("bob1", "a.b.*.d"),("bob2","a.b.c")], ["col1", "col2"])
df2 = spark.createDataFrame([
    ("tom1", "a.b.c.d"),("tom2","a.b.c")], ["col3", "col4"])
df1 = df1.withColumn("join_col", F.concat(F.lit("^"), F.regexp_replace(F.col("col2"), "\\*", "(\\\\w+)"), F.lit("$")))
df_joined = df1.join(df2, F.expr("col4 rlike join_col"))
df_joined.show()

prints

+----+-------+-------------+----+-------+
|col1|   col2|     join_col|col3|   col4|
+----+-------+-------------+----+-------+
|bob1|a.b.*.d|^a.b.(\w+).d$|tom1|a.b.c.d|
|bob2|  a.b.c|      ^a.b.c$|tom2|  a.b.c|
+----+-------+-------------+----+-------+

The parentheses around \w+ could be omitted.

Unfortunately, df_joined.explain() shows that the rlike join causes a CartesianProduct:

== Physical Plan ==
CartesianProduct col4#5 RLIKE join_col#26
:- *(1) Project [col1#0, col2#1, concat(^, regexp_replace(col2#1, \*, (\\w+)), $) AS join_col#26]
:  +- *(1) Filter isnotnull(concat(^, regexp_replace(col2#1, \*, (\\w+)), $))
:     +- *(1) Scan ExistingRDD[col1#0,col2#1]
+- *(2) Filter isnotnull(col4#5)
   +- *(2) Scan ExistingRDD[col3#4,col4#5]
Sign up to request clarification or add additional context in comments.

4 Comments

Ahh, thanks! CartesianProduct is to be expected here, but one side of the join in my case is really small compared to the other so im not that worried
Also, broacasting the small df, it could be helpful here.
I realised both the dataframes are relatively small. One is ~1gb and the other is ~100mb. I just collected the dataframe and replaced all the logic with pure python code and it runs pretty fast. Takes like 10 seconds to do the job. I think I don't even need to read the file using spark, I'll just read it in the driver using python code.
Also using pandas it could even faster @Dexter :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.