0

I want to know how I can create the following “complex” json structure in (py)Spark (2.3.2):

Test data set:

df = sc.parallelize([
    [1, 'a', 'x1'],
    [1, 'a', 'x2'],
    [2, 'b', 'y1'],
    [2, 'b', 'y2'],
    [3, 'c', 'z'],
]).toDF('id: integer, field1: string, field2: string').cache()

Code:

import pyspark.sql.functions as F

out = df.groupBy('id').agg(F.to_json(F.create_map(
    F.lit('key1'),
    F.col('field1'),
    F.lit('info'),
    F.collect_list(F.create_map(
        F.lit('number'),
        F.col('id'),
        F.lit('key2'),
        F.col('field2')
    ))
)).alias('info'))

What my target json structure is, is that I have a data set like this:

[
(id) 1, (info) {“key1”: ‘a’, “info”: [{“number”: 1, “key2”: “x1”}, {“number”: 1, “key2”: “x1”}],
(id) 2, (info) {“key1”: ‘b’, “info”: [{“number”: 2, “key2”: “y1”}, {“number”: 1, “key2”: “x2”}],
(id) 3, (info) {“key1”: ‘c’, “info”: [{“number”: 3, “key2”: “z”}]
]

How could I achieve this? (Can I achieve this?) As I'm always getting the following error:

org.apache.spark.sql.AnalysisException:
cannot resolve 'map('key1', `field1`, 'info', collect_list(map('number',
  CAST(`id` AS STRING), 'key2', CAST(`field2` AS STRING))))'
due to data type mismatch: The given values of function map should all be the same type,
  but they are [string, array<map<string,string>>]

What I understand from this error is that field1 is a string, and the value of 'info' is not. But that's the way I want it to be... So, could I achieve this another way?

Thanks!

2
  • I believe you can use the trick I used in solving another problem. It is shown here. stackoverflow.com/questions/64602777/… All in all I think it should be possible but will require effort from your side. Commented Oct 30, 2020 at 20:01
  • Sorry @user238607, that did not really help me with my issue as I also needed to collect them. Commented Nov 1, 2020 at 7:25

1 Answer 1

1

I found one (hackish) way to do things... I don't like it very much but seeing that no-one in this community posted an answer, I am starting to think it's not that easy.

So first of all, I split the "big" aggregation in 2:

out = df.groupBy('id', 'field1').agg(F.to_json(F.create_map(
    F.lit('key1'),
    F.col('field1'),
    F.lit('info'),
    F.lit('%%replace%%')
)).alias('first'), F.to_json(    F.collect_list(F.create_map(
        F.lit('number'),
        F.col('id'),
        F.lit('key2'),
        F.col('field2')
    ))
).alias('second'))

This will generate the following table:

+---+------+---------------------------------+-------------------------------------------------------+
|id |field1|first                            |second                                                 |
+---+------+---------------------------------+-------------------------------------------------------+
|3  |c     |{"key1":"c","info":"%%replace%%"}|[{"number":"3","key2":"z"}]                            |
|2  |b     |{"key1":"b","info":"%%replace%%"}|[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]|
|1  |a     |{"key1":"a","info":"%%replace%%"}|[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]|
+---+------+---------------------------------+-------------------------------------------------------+

And now you combine them together:

df2 = out.withColumn('final', F.expr('REPLACE(first, \'"%%replace%%"\', second)')).drop('first').drop('second')
df2.show(10, False)

+---+------+---------------------------------------------------------------------------+
|id |field1|final                                                                      |
+---+------+---------------------------------------------------------------------------+
|3  |c     |{"key1":"c","info":[{"number":"3","key2":"z"}]}                            |
|2  |b     |{"key1":"b","info":[{"number":"2","key2":"y1"},{"number":"2","key2":"y2"}]}|
|1  |a     |{"key1":"a","info":[{"number":"1","key2":"x1"},{"number":"1","key2":"x2"}]}|
+---+------+---------------------------------------------------------------------------+

A bit unorthodox, but no complaints from Spark :)

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.