2

I am tring to get the combinations for each row of a dataframe. For example, my input data looks like,

+-------+---+
|kc_pair|h_v|
+-------+---+
| [a, 1]|123|
+-------+---+
| [b, 2]|123|
+-------+---+
| [c, 3]|123|
+-------+---+
| [b, 2]|234|
+-------+---+
| [c, 3]|234|
+-------+---+

The output combination dataframe should be grouped by the h_v column and it should be like,

+---------------+---+
|       kc_pairs|h_v|
+---------------+---+
| [a, 1], [b, 2]|123|
+---------------+---+
| [a, 1], [c, 3]|123|
+---------------+---+
| [b, 2], [c, 3]|123|
+---------------+---+
| [b, 2], [c, 3]|234|
+---------------+---+

I've tried using itertools.combinations as a udf applied to the specific column. First aggregating the kc_pair with the same h_v value as a list, something like this,

+----------------------+---+
|              kc_pairs|h_v|
+----------------------+---+
| [a, 1], [b, 2], [c,3]|123|
+----------------------+---+
|        [b, 2], [c, 3]|234|
+----------------------+---+

And then applied the udf to column kc_pair

F.udf(lambda x: list(itertools.combinations(x, 2)),
                     returnType=ArrayType(ArrayType(StringType())))

Now, a critical issue is that it cannot avoid data skew which means if a cell in kc_pair contains over 10,000 elements, the worker might fail the task. Any idea to this problem?

1
  • About data skew, read this article Commented May 10, 2021 at 17:56

1 Answer 1

1

I solved using self join based on mck answer:

UPDATE: A straightforward solution

import pyspark.sql.functions as f
from pyspark import Row
from pyspark.shell import spark
from pyspark.sql import DataFrame

df: DataFrame = spark.createDataFrame([
    Row(kc_pair=['a', 1], h_v=123),
    Row(kc_pair=['b', 2], h_v=123),
    Row(kc_pair=['c', 3], h_v=123),
    Row(kc_pair=['b', 2], h_v=234),
    Row(kc_pair=['c', 3], h_v=234),
])

df_final = df.alias('l').join(df.alias('r'), on=((f.col('l.h_v') == f.col('r.h_v')) &
                                                 (f.col('l.kc_pair')[0] < f.col('r.kc_pair')[0])))

df_final = df_final.select(f.array(f.col('l.kc_pair'),
                                   f.col('r.kc_pair')).alias('kc_pairs'),
                           f.col('l.h_v'))

df_final.show(truncate=False)

Output:

+----------------+---+
|kc_pairs        |h_v|
+----------------+---+
|[[a, 1], [b, 2]]|123|
|[[a, 1], [c, 3]]|123|
|[[b, 2], [c, 3]]|123|
|[[b, 2], [c, 3]]|234|
+----------------+---+
Sign up to request clarification or add additional context in comments.

2 Comments

Nice thought! Not tried yet but seems totally fine with my problem. About the data skew, I have read the link you commented above. Based on your self-join solution, df.alias('l').join(broadcast(df.alias('r')), ... is supposed to work here, isn't it?
Being honest, I'm not sure about it. Only testing with your real dataset and monitoring to know

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.