0

I have been working on a Spark dataframe code and I am using spark version 2.2.0 .When I tried to run aggregate function on a column I am getting an error

+----------------------------------+
|amount                            |
+----------------------------------+
|[1197, 8797, 6146, 253, 4521, 955]|
|[1197, 8797, 6146, 253, 4521, 955]|
+----------------------------------+

I want to sum the array values present in each row and add the result to the existing dataframe

object JsonSpark {
  def main(args: Array[String]): Unit = {
    val spark:SparkSession=SparkSession.builder()
      .master("local[1]")
      .appName(("Spark"))
      .getOrCreate()

    val df=spark.read.option("multiline",true).json("C:\\Users\\Admin\\Desktop\\jsonfile.json")
    val df2=df.select("transactions")
    val df2extract=df2.select("transactions.amount")

    df2extract.selectExpr(
      "aggregate(amount, 0, (x, y) -> x + y) as details_sum"
    ).show()

  }
}

The error I get is

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '>' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 29)

== SQL ==
aggregate(amount, 0, (x, y) -> x + y) as details_sum
-----------------------------^^^

Please help me in solving this error.Thanks a lot!

4
  • what do you want to do in the aggregate. sum of an array ? Commented Jun 29, 2020 at 5:57
  • Yeah I want to sum the array values present in each row Commented Jun 29, 2020 at 6:00
  • If you are using 2.2 then you need to create udf and calculate sum of an array. Commented Jun 29, 2020 at 6:16
  • How to add the result of the UDF function to the existing dataframe ? Commented Jun 29, 2020 at 7:39

1 Answer 1

2

Assuming you are trying to do sum of an array -

spark>=2.4.0

  val df = spark.sql(
      """
        |select amount
        |from values
        |(array(1197, 8797, 6146, 253, 4521, 955)),
        |(array(1197, 8797, 6146, 253, 4521, 955))
        |T(amount)
      """.stripMargin)
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------------+
      * |amount                            |
      * +----------------------------------+
      * |[1197, 8797, 6146, 253, 4521, 955]|
      * |[1197, 8797, 6146, 253, 4521, 955]|
      * +----------------------------------+
      *
      * root
      * |-- amount: array (nullable = false)
      * |    |-- element: integer (containsNull = false)
      */

    df.selectExpr(
      "aggregate(amount, 0, (x, y) -> x + y) as details_sum"
    ).show()

    /**
      * +-----------+
      * |details_sum|
      * +-----------+
      * |      21869|
      * |      21869|
      * +-----------+
      */

spark< 2.4 Either -

    //spark < 2.4
    val intArraySum = udf((arrayInt: mutable.WrappedArray[Int]) => arrayInt.sum)
    df.select(
      intArraySum($"amount").as("details_sum")
    ).show()

    /**
      * +-----------+
      * |details_sum|
      * +-----------+
      * |      21869|
      * |      21869|
      * +-----------+
      */

or

 val intSeqSum = udf((arrayInt: Seq[Int]) => arrayInt.sum)
    df.select(
      intSeqSum($"amount").as("details_sum")
    ).show()

    /**
      * +-----------+
      * |details_sum|
      * +-----------+
      * |      21869|
      * |      21869|
      * +-----------+
      */

From comments-

Add result column as new column in the existing dataframe

df.withColumn("details_sum", intSeqSum(function.col("amount")))
.show()
Sign up to request clarification or add additional context in comments.

12 Comments

I am using spark 2.2.0. How can I sum the array values in spark 2.2.0 ?
I tried that but I am getting an error like this Error:(25, 38) not found: value mutable val intArraySum = udf((arrayInt: mutable.WrappedArray[Int]) => arrayInt.sum)
try one of the 2 option i provided. Use scala Seq
I am getting an error like this Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'amount' given input columns: [account_id, bucket_start_date, transaction_count, transactions, bucket_end_date];; 'Project [UDF('amount) AS details_sum#20]
use import org.apache.spark.sql.functions.col. the example i provided is just on sample data, use proper array column from your dataframe may be transactions.amount
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.