0

I have almost ready what I want to do, however the method that converts to a JSON object does not help me to solve what is missing. I want to get the same thing, but there will be more content inside "add" and inside "firsts" and so I need them to be arrays of objects.

My code:

case class FirstIdentity(docType: String, docNumber: String, pId: String)
case class SecondIdentity(firm: String, code: String, orgType: String,
                              orgNumber: String, typee: String, perms: Seq[String])
case class General(id: Int, pName: String, description: String, add: Seq[SecondIdentity],
                       delete: Seq[String], act: String, firsts: Seq[FirstIdentity])

val someDF = Seq(
      ("0010XR_TYPE_6","0010XR", "222222", "6", "TYPE", "77444478", "6", 123, 1, "PF 1", "name", "description",
      Seq("PERM1", "PERM2"))
    ).toDF("firm", "code", "org_number", "org_type", "type", "doc_number",
           "doc_type", "id", "p_id", "p_name", "name", "description", "perms")

someDF.createOrReplaceTempView("vw_test")

val filter = spark.sql("""
                        select
                            firm, code, org_number, org_type, type, doc_number,
                             doc_type, id, p_id, p_name, name, description, perms
                         from vw_test
                    """)

val group =
      filter.rdd.map(x => {
          (
            x.getInt(x.fieldIndex("id")),
            x.getString(x.fieldIndex("p_name")),
            x.getString(x.fieldIndex("description")),
            SecondIdentity(
              x.getString(x.fieldIndex("firm")),
              x.getString(x.fieldIndex("code")),
              x.getString(x.fieldIndex("org_type")),
              x.getString(x.fieldIndex("org_number")),
              x.getString(x.fieldIndex("type")),
              x.getSeq(x.fieldIndex("perms"))
            ),
            "act",
            FirstIdentity(
              x.getString(x.fieldIndex("doc_number")),
              x.getString(x.fieldIndex("doc_type")),
              x.getInt(x.fieldIndex("p_id")).toString
            )
          )
        })
        .toDF("id", "name", "desc", "add", "actKey", "firsts")
        .groupBy("id", "name", "desc", "add", "actKey", "firsts")
        .agg(collect_list("add").as("null"))
        .drop("null")

group.toJSON.show(false)

result:

{
  "id": 123,
  "name": "PF 1",
  "desc": "description",
  "add": {
    "firm": "0010XR_TYPE_6",
    "code": "0010XR",
    "orgType": "6",
    "orgNumber": "222222",
    "typee": "TYPE",
    "perms": [
      "PERM1",
      "PERM2"
    ]
  },
  "actKey": "act",
  "firsts": {
    "docType": "77444478",
    "docNumber": "6",
    "pId": "1"
  }
}

I want to have an array of "add" and also of "firsts"

this:

EDIT

{
  "id": 123,
  "name": "PF 1",
  "desc": "description",
  "add": [   <----
    {
      "firm": "0010XR_TYPE_6",
      "code": "0010XR",
      "orgType": "6",
      "orgNumber": "222222",
      "typee": "TYPE",
      "perms": [
        "PERM1",
        "PERM2"
      ]
    },
    {
      "firm": "0010XR_TYPE_6",
      "code": "0010XR",
      "orgType": "5",
      "orgNumber": "11111",
      "typee": "TYPE2",
      "perms": [
        "PERM1",
        "PERM2"
      ]
    }
  ],
  "actKey": "act",
  "firsts": [  <----
    {
      "docType": "77444478",
      "docNumber": "6",
      "pId": "1"
    },
    {
      "docType": "411133",
      "docNumber": "6",
      "pId": "2"
    }
  ]
}

1 Answer 1

1

As per your comment, you want to aggregate add depending on some grouping. Please check what all columns you want to group by. The columns which you want to Agrregate cannot be part of grouping. That will never work, and will give you always separate records.

This will work as per your expectations (I suppose):

  val group =
    filter.rdd.map(x => {
      (
        x.getInt(x.fieldIndex("id")),
        x.getString(x.fieldIndex("p_name")),
        x.getString(x.fieldIndex("description")),
        SecondIdentity(
          x.getString(x.fieldIndex("firm")),
          x.getString(x.fieldIndex("code")),
          x.getString(x.fieldIndex("org_type")),
          x.getString(x.fieldIndex("org_number")),
          x.getString(x.fieldIndex("type")),
          x.getSeq(x.fieldIndex("perms"))
        ),
        "act",
        FirstIdentity(
          x.getString(x.fieldIndex("doc_number")),
          x.getString(x.fieldIndex("doc_type")),
          x.getInt(x.fieldIndex("p_id")).toString
        )
      )
    })
      .toDF("id", "name", "desc", "add", "actKey", "firsts")
      .groupBy("id", "name", "desc", "actKey")
      .agg(collect_list("add").as("null"))
      .drop("null")

Result:

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                       |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"id":123,"name":"PF 1","desc":"description","actKey":"act","collect_list(add)":[{"firm":"0010XR_TYPE_6","code":"0010XR","orgType":"6","orgNumber":"222222","typee":"TYPE","perms":["PERM1","PERM2"]},{"firm":"0010XR_TYPE_5","code":"0010XR","orgType":"5","orgNumber":"222223","typee":"TYPE","perms":["PERM1","PERM2"]}]}|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


Inside your map function, you are not mapping the FirstEntity and SecondEntity as Seq hence the add is not getting converted to array.

Change your map function to this:


    filter.rdd.map(x => {
      (
        x.getInt(x.fieldIndex("id")),
        x.getString(x.fieldIndex("p_name")),
        x.getString(x.fieldIndex("description")),
        Seq(SecondIdentity(
          x.getString(x.fieldIndex("firm")),
          x.getString(x.fieldIndex("code")),
          x.getString(x.fieldIndex("org_type")),
          x.getString(x.fieldIndex("org_number")),
          x.getString(x.fieldIndex("type")),
          x.getSeq(x.fieldIndex("perms"))
        )),
        "act",
        Seq(FirstIdentity(
          x.getString(x.fieldIndex("doc_number")),
          x.getString(x.fieldIndex("doc_type")),
          x.getInt(x.fieldIndex("p_id")).toString
        ))
      )
    })

Will result into this:

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"id":123,"name":"PF 1","desc":"description","add":[{"firm":"0010XR_TYPE_6","code":"0010XR","orgType":"6","orgNumber":"222222","typee":"TYPE","perms":["PERM1","PERM2"]}],"actKey":"act","firsts":[{"docType":"77444478","docNumber":"6","pId":"1"}]}|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Sign up to request clarification or add additional context in comments.

3 Comments

That works. However, when you have two records (2 rows) in the DF, which have different fields inside "add" for example, I want it to be inside the first record. I don't know if I explain myself.
I edited my post, at the end I put the example. Thank you for your help.
I guess you wanted to group using id and then collecting all "add" fields of same "id"s into list. I edited the answer for same scenario. Have a look whether that's what you are looking for. I am not able to understand your question otherwise :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.