1

I am working on a spark project and have some performance issue that I am struggling with, any help will be appreciated.

I have a column Collection that is an array of struct:

root

|-- Collection: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Key: string (nullable = true)
|    |    |-- Value: string (nullable = true)

The goal is to transform this column to struct type based on a list of available Keys:

|-- Collection: struct (nullable = true)
|    |-- Key1: string (nullable = true)
|    |-- Key2: array[string](nullable = true)
|    |-- Key3: string (nullable = true)

I have the list of available keys as below:

{
  "type" : "struct",
  "fields" : [ {
    "name" : "Key1",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "Key2",
    "type" : {
      "type" : "array",
      "elementType" : "string",
      "containsNull" : true
    },
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "Key3",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }]
}

The reason that the Value Type can be string or array[string] is that in the original column, the array can contains multiple struct with the same key, in which case we inferred them as array[string] type. For example we want to transform an array of struct like this:

[{key1, value1}, {key2, value2}, {key2, value3}, {key3, value4}]

to this:

{value1, [value2, value3], value4}

I am currently able to do the job by using this function:

  def function(column: Column, columnSchema: StructType): Column = {
    var newColumn = struct()
    for(x <- columnSchema.fields.toList) {
      x.dataType match {
        case ArrayType(StringType, true) => newColumn = newColumn.withField(x.name, transform(filter(column, e => e("Key") === x.name), e => e("Value")))
        case StringType => newColumn = newColumn.withField(x.name, element_at(filter(column, e => e("Key") === x.name), 1).getField("Value"))
      }
    }
    newColumn
  }

In reality I have 30 available keys, which means I will need the new struct column with 30 struct fields. I find it to be very slow because the filter() method here have to be run on each row for 30 times to retrieve all the key value pairs.

Is there a better way to improve this? Many thanks!

1 Answer 1

0

Not a complete answer, but hopefully helpful. Instead of converting the column directly in a UDF, I'd suggest exploding the array, breaking up the struct into key-value columns, grouping by key to collect values with the same key, then creating your new struct. I constructed an example like this:

val data = Seq(
    Row(List(Row("k1", "v1"), Row("k2", "v2"), Row("k2", "v3"), Row("k4", "v4"))), 
    Row(List(Row("k1", "v1"), Row("k1", "v2")))
)

val schema = new StructType()
  .add("arr", ArrayType(new StructType()
    .add("key", StringType)
    .add("value", StringType)))

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.show(false)

// output

+----------------------------------------+
|arr                                     |
+----------------------------------------+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|
|[{k1, v1}, {k1, v2}]                    |
+----------------------------------------+

Explode the array to produce one key-value pair per row and break key and value into their own columns:

val exploded = df.withColumn("kv", explode(col("arr"))).select("arr", "kv.key", "kv.value")

exploded.show(false)

// output

+----------------------------------------+---+-----+
|arr                                     |key|value|
+----------------------------------------+---+-----+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k1 |v1   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |v2   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |v3   |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k4 |v4   |
|[{k1, v1}, {k1, v2}]                    |k1 |v1   |
|[{k1, v1}, {k1, v2}]                    |k1 |v2   |
+----------------------------------------+---+-----+

Group by the key and collect the values:

val collected = exploded.groupBy("arr", "key").agg(collect_list("value").as("values"))
collected.show(false)

// output

+----------------------------------------+---+--------+
|arr                                     |key|values  |
+----------------------------------------+---+--------+
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k1 |[v1]    |
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k2 |[v2, v3]|
|[{k1, v1}, {k2, v2}, {k2, v3}, {k4, v4}]|k4 |[v4]    |
|[{k1, v1}, {k1, v2}]                    |k1 |[v1, v2]|
+----------------------------------------+---+--------+

You might also want to include first("value") in the agg so that you'd have access to singular values as a string instead of an array. At this point, you can group by the original array and construct the final struct.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.