1

I have a dataframe like below -

enter image description here

I am trying to create another dataframe from this which has 2 columns - the column name and the sum of values in each column like this -

enter image description here

So far, I've tried this (in Spark 2.2.0) but throws a stack trace -

val get_count: (String => Long) = (c: String) => {
    df.groupBy("id")
      .agg(sum(c) as "s")
      .select("s")
      .collect()(0)
      .getLong(0)
}
val sqlfunc = udf(get_count)

summary = summary.withColumn("sum_of_column", sqlfunc(col("c")))

Are there any other alternatives of accomplishing this task?

1
  • instead of screenshot, copy your test dataframe Commented Mar 28, 2019 at 13:59

2 Answers 2

2

I think that the most efficient way is to do an aggregation and then build a new dataframe. That way you avoid a costly explode.

First, let's create the dataframe. BTW, it's always nice to provide the code to do it when you ask a question. This way we can reproduce your problem in seconds.

val df = Seq((1, 1, 0, 0, 1), (1, 1, 5, 0, 0),
             (0, 1, 0, 6, 0), (0, 1, 0, 4, 3))
    .toDF("output_label", "ID", "C1", "C2", "C3")

Then we build the list of columns that we are interested in, the aggregations, and compute the result.

val cols = (1 to 3).map(i => s"C$i")
val aggs = cols.map(name => sum(col(name)).as(name))
val agg_df = df.agg(aggs.head, aggs.tail :_*) // See the note below
agg_df.show
+---+---+---+
| C1| C2| C3|
+---+---+---+
|  5| 10|  4|
+---+---+---+

We almost have what we need, we just need to collect the data and build a new dataframe:

val agg_row = agg_df.first
cols.map(name => name -> agg_row.getAs[Long](name))
    .toDF("column", "sum")
    .show
+------+---+
|column|sum|
+------+---+
|    C1|  5|
|    C2| 10|
|    C3|  4|
+------+---+

EDIT:

NB: df.agg(aggs.head, aggs.tail :_*) may seem strange. The idea is simply to compute all the aggregations computed in aggs. One would expect something more simple like df.agg(aggs : _*). Yet the signature of the agg method is as follows:

def agg(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)

maybe to ensure that at least one column is used, and this is why you need to split aggs in aggs.head and aggs.tail.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for your response. I will provide code rather than screenshots the next time i ask a question. Can you help me understand why do you val agg_df = df.agg(aggs.head, aggs.tail :_*). Also, you need to edit your answer to modify val cols = (1 until 3).map(i => s"C$i") to val cols = (1 until 4).map(i => s"C$i") so that you loop through all the C1 to C3 columns
1

What i do is to define a method to create a struct from the desired values:

  def kv (columnsToTranspose: Array[String]) = explode(array(columnsToTranspose.map {
    c => struct(lit(c).alias("k"), col(c).alias("v"))
  }: _*))

This functions receives a list of columns to transpose (your 3 last columns in your case) and transform them in a struct with the column name as key and the column value as value

And then use that method to create an struct and process it as you want

df.withColumn("kv", kv(df.columns.tail.tail))
.select( $"kv.k".as("column"), $"kv.v".alias("values"))
.groupBy("column")
.agg(sum("values").as("sum"))

First apply the previous defined function to have the desired columns as the said struct, and then deconstruct the struct to have a column key and a column value in each row. Then you can aggregate by the column name and sum the values

INPUT

+------------+---+---+---+---+
|output_label| id| c1| c2| c3|
+------------+---+---+---+---+
|           1|  1|  0|  0|  1|
|           1|  1|  5|  0|  0|
|           0|  1|  0|  6|  0|
|           0|  1|  0|  4|  3|
+------------+---+---+---+---+

OUTPUT

+------+---+
|column|sum|
+------+---+
|    c1|  5|
|    c3|  4|
|    c2| 10|
+------+---+

2 Comments

Your solution works perfectly fine for few columns. It's just that I have more than 30k C type columns (apologies for not specifying in the question upfront) and the explode fails for that many columns. Looking to see if answer suggested by +Oli works.
I'm running this in spark-shell so it's a bit difficult to post the entire error as it's a pretty long stack trace, but error appears similar to issues.apache.org/jira/browse/SPARK-19984

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.