0

Everyday I'm receiving file with ~2k columns. There is 900 "relationship" columns. For example:

    data.id | name | AGE |data.rel.1 | data.rel.2 | data.rel.1.type | data.rel.2.type
    12      | JOE  | 25  | ASDF      | QWER       | order           | order
    23      | TIM  | 20  | AAAA      | SSSS       | product         | product
    34      | BRAD | 32  | XXXX      | null       | order           | null
    11      | MATT | 23  | ASDF      | QWER       | agreement       | agreement

I need to flatten data and create "id - rel - rel type" dataframe which would contain only data.id, data.rel and data.rel.type

    data.id | data.rel | data.rel.type
    12      | ASDF     | order   
    12      | QWER     | order        
    23      | AAAA     | product    
    23      | SSSS     | product     
    34      | XXXX     | order   
    11      | ASDF     | agreement   
    11      | QWER     | agreement

This solution seems to be working with one column, however I'm not sure how incorporate rel.type column into the same logic:

   pattern = '/*rel/*'     

   def explode(row,pattern):
       for c in row.asDict():
           if re.search(pattern, c):
               yield (row['data_id'],row[c])


    df.rdd.flatMap(lambda r:explode(r,pattern))
             .toDF(['data_id','data_rel'])
             .filter(F.col('data_rel').isNotNull())
             .show()

Any ideas?

4
  • i was just thinking since i dont know python much I could not able to put answer here.. first select data.id and data.rel.1 as df1 similarly data.id and data.rel.2 as df2 and data.id and data.rel.3 as df3 now you have 3 dataframes then union them you will get above output Commented May 13, 2019 at 15:52
  • @RamGhadiyaram that was my initial idea but it's extremely inefficient. Thank you. Commented May 13, 2019 at 16:14
  • 1
    union is not inefficient at all ... it avoids a shuffle and since your output is only 2 cols it will stack them all together. Commented May 13, 2019 at 17:09
  • Possible duplicate of How to melt Spark DataFrame? Commented May 13, 2019 at 18:17

2 Answers 2

3

Here is a solution

import pyspark.sql.functions as F

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],
    ['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)

# Create an array of the columns you want
cols = F.array(
    *[F.col(c).alias(c) for c in ['data_rel_1', 'data_rel_2', 'data_rel_3']]
)

df.withColumn(
    "data_rel", cols
).select(
    'data_id',F.explode('data_rel').alias('data_rel')
).filter(
    F.col('data_rel').isNotNull()
).show()

which results in:

+-------+--------+
|data_id|data_rel|
+-------+--------+
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    SSSS|
|     23|    DDDD|
|     34|    XXXX|
|     11|    ASDF|
|     11|    QWER|
+-------+--------+

EDIT Another solution using rdd and also explode can take pattern as a param(This may not result in exceptions with more cols)

import pyspark.sql.functions as F

#takes pattern, and explodes those cols which match pattern
def explode(row,pattern):
    import re
    for c in row.asDict():
        if re.search(pattern, c):
            yield (row['data_id'],row[c])

df = spark.createDataFrame(
    [(12, 'JOE', 25, 'ASDF', 'QWER', 'ZXCV'),
    (23, 'TIM', 20, 'AAAA', 'SSSS', 'DDDD'),
    (34, 'BRAD', 32, 'XXXX', None, None),
    (11, 'MATT', 23, 'ASDF', 'QWER', None)],['data_id','name','AGE','data_rel_1','data_rel_2','data_rel_3']
)
pattern = '/*rel/*'
df.rdd.flatMap(
    lambda r:explode(r,pattern)
).toDF(
    ['data_id','data_rel']
).filter(
    F.col('data_rel').isNotNull()
).show()
Sign up to request clarification or add additional context in comments.

10 Comments

Thank you guys, I will try this solution with explode function and will let you know how it work with my data.
You can skip the withColumn step and directly select("`data.id`", F.explode(cols))
@pault I'm getting StackOverflowError, tried to increase memory, but it didn't help. Updated initial question with details.
@AdasKavaliauskas explode is an expensive operation. You may be better off using the union method.
@AdasKavaliauskas try with rdd approach (i edited with another solution)
|
0

Dont know python much I could not able to put answer here.. wrote in scala. you can try to translate to python. - first select data.id and data.rel.1 as df1 similarly data.id and data.rel.2 as df2 and data.id and data.rel.3 as df3

Now you have 3 dataframes then union them you will get above output

import org.apache.spark.sql.{ SparkSession}

/**
  * Created by Ram Ghadiyaram
  */
object DFUnionExample {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.
      master("local")
      .appName("DFUnionExample")
      .getOrCreate()

    import sparkSession.implicits._

    val basedf = Seq((12, "JOE", 25, "ASDF", "QWER", "ZXCV"),
      (23, "TIM", 20, "AAAA", "SSSS", "DDDD"),
      (34, "BRAD", 32, "XXXX", null, null),
      (11, "MATT", 23, "ASDF", "QWER", null)
    ).toDF("data.id", "name", "AGE", "data.rel.one", "data.rel.two", "data.rel.three")
    basedf.show
    import org.apache.spark.sql.functions._
     val df1 =   basedf.select(col("`data.id`"),col("`data.rel.one`"))
        val df2 =basedf.select(col("`data.id`"),col("`data.rel.two`"))
        val df3 =   basedf.select(col("`data.id`"),col("`data.rel.three`"))
        df1.union(df2).union(df3)
          .select(col("`data.id`"),col("`data.rel.one`").as("data.rel"))
          .filter(col("`data.rel`").isNotNull)
          .sort(col("`data.id`")).show
  }
}

Result:

+-------+----+---+------------+------------+--------------+
|data.id|name|AGE|data.rel.one|data.rel.two|data.rel.three|
+-------+----+---+------------+------------+--------------+
|     12| JOE| 25|        ASDF|        QWER|          ZXCV|
|     23| TIM| 20|        AAAA|        SSSS|          DDDD|
|     34|BRAD| 32|        XXXX|        null|          null|
|     11|MATT| 23|        ASDF|        QWER|          null|
+-------+----+---+------------+------------+--------------+

+-------+--------+
|data.id|data.rel|
+-------+--------+
|     11|    QWER|
|     11|    ASDF|
|     12|    ASDF|
|     12|    QWER|
|     12|    ZXCV|
|     23|    AAAA|
|     23|    DDDD|
|     23|    SSSS|
|     34|    XXXX|
+-------+--------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.