1

I applied an algorithm from the question below(in NOTE) to transpose and explode nested spark dataframe.

When I define cols = ['a', 'b'] I get empty dataframe, but when I define cols = ['a'] I get transformed dataframe for the a column. See section Current code below for more details. Any help would be appreciated.

I'm looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.

NOTE: This is minimum example to highlight the problem, in reality dataframe schema and arrays length vary as in the example Pyspark: How to flatten nested arrays by merging values in spark

Input df:

+---+------------------+--------+
| id|                 a|       b|
+---+------------------+--------+
|  1|[{1, 1}, {11, 11}]|    null|
|  2|              null|[{2, 2}]|
+---+------------------+--------+


root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true

Required output 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
+---+------+-------------------+

Required output 2 (explode_df):

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
|  1|   a|   1|  1|
|  1|   a|  11| 11|
|  2|   b|   2|  2|
+---+----+----+---+

Current code:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}"""]))

cols = ['a', 'b']

expressions = [f.expr('TRANSFORM({col}, el -> STRUCT("{col}" AS cols, el.date, el.val))'.format(col=col)) for col in cols ]

transpose_df = df.withColumn('arrays', f.flatten(f.array(*expressions)))
             
explode_df = transpose_df.selectExpr('id', 'inline(arrays)')

explode_df.show()

Current Outcome

+---+----+----+---+
| id|cols|date|val|
+---+----+----+---+
+---+----+----+---+

1 Answer 1

1

stack might be a better option than transpose for the first step.


expr = f"stack({len(cols)}," + \
    ",".join([f"'{c}',{c}" for c in cols]) + \
    ")"
#expr = stack(2,'a',a,'b',b)

transpose_df = df.selectExpr("id", expr) \
    .withColumnRenamed("col0", "cols") \
    .withColumnRenamed("col1", "arrays") \
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks a lot that's works well for arrays with static fieds (date, val). I made one more question in case the arrays in columns has dynamic field which change randomly . Any help will be appreciated. stackoverflow.com/questions/69423246/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.