5

I have a CSV file and I am processing its data.

I am working with data frames, and I calculate average, min, max, mean, sum of each column based on some conditions. The data of each column could be empty or null.

I have noticed that in some cases I got as max, or sum a null value instead of a number. Or I got in max() a number which is less that the output that the min() returns.

I do not want to replace the null/empty values with other. The only thing I have done is to use these 2 options in CSV:

.option("nullValue", "null")
.option("treatEmptyValuesAsNulls", "true")

Is there any way to handle this issue? Have everyone faced this problem before? Is it a problem of data types?

I run something like this:

data.agg(mean("col_name"), stddev("col_name"),count("col_name"), 
         min("col_name"), max("col_name"))

Otherwise I can consider that it is a problem in my code.

1
  • These aggregation functions ignore null values, so the presence of null values shouldn't be a problem. You said you calculate them based on some conditions, maybe the problem is there. Also, try to make sure you are applying those functions on numeric columns. You can use .cast("double") on them if necessary. Commented May 13, 2016 at 20:26

3 Answers 3

5

I have done some research on this question, and the result shows that mean, max, min functions ignore null values. Below is the experiment code and results. Environment: Scala, Spark 1.6.1 Hadoop 2.6.0

import org.apache.spark.sql.{Row}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}

val row1 =Row("1", 2.4, "2016-12-21")
val row2 = Row("1", None, "2016-12-22")
val row3 = Row("2", None, "2016-12-23")
val row4 = Row("2", None, "2016-12-23")
val row5 = Row("3", 3.0, "2016-12-22")
val row6 = Row("3", 2.0, "2016-12-22")
val theRdd = sc.makeRDD(Array(row1, row2, row3, row4, row5, row6))

val schema = StructType(StructField("key", StringType, false) ::
                    StructField("value", DoubleType, true) ::
                    StructField("d", StringType, false) :: Nil)
val df = sqlContext.createDataFrame(theRdd, schema)

df.show()

df.agg(mean($"value"), max($"value"), min($"value")).show()

df.groupBy("key").agg(mean($"value"), max($"value"), min($"value")).show()


Output:
+---+-----+----------+
|key|value|         d|
+---+-----+----------+
|  1|  2.4|2016-12-21|
|  1| null|2016-12-22|
|  2| null|2016-12-23|
|  2| null|2016-12-23|
|  3|  3.0|2016-12-22|
|  3|  2.0|2016-12-22|
+---+-----+----------+
+-----------------+----------+----------+
|       avg(value)|max(value)|min(value)|
+-----------------+----------+----------+
|2.466666666666667|       3.0|       2.0|
+-----------------+----------+----------+
+---+----------+----------+----------+
|key|avg(value)|max(value)|min(value)|
+---+----------+----------+----------+
|  1|       2.4|       2.4|       2.4|
|  2|      null|      null|      null|
|  3|       2.5|       3.0|       2.0|
+---+----------+----------+----------+

From the output you can see that the mean, max, min functions on column 'value' of group key='1' returns '2.4' instead of null which shows that the null values were ignored in these functions. However, if the column contains only null values then these functions will return null values.

Sign up to request clarification or add additional context in comments.

Comments

2

Contrary to one of the comments it is not true that nulls are ignored. Here is an approach:

max(coalesce(col_name,Integer.MinValue))
min(coalesce(col_name,Integer.MaxValue))

This will still have an issue if there were only null values: you will need to convert Min/MaxValue to null or whatever you want to use to represent "no valid/non-null entries".

Comments

1

To add to other answers: Remember the null and NaN are different things to spark:

  1. NaN is not a number and numeric aggregations on a column with NaN in it result in NaN
  2. null is a missing value and numeric aggregations on a column with null ignore it as if the row wasn't even there
df_=spark.createDataFrame([(1, np.nan), (None, 2.0),(3,4.0)], ("a", "b"))
df_.show()
|   a|  b|
+----+---+
|   1|NaN|
|null|2.0|
|   3|4.0|
+----+---+

df_.agg(F.mean("a"),F.mean("b")).collect()
[Row(avg(a)=2.0, avg(b)=nan)]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.