2

I have this function:

def countNullValueColumn(df: DataFrame): Array[(String, Long)] = 
   df.columns
      .map(x => (x, df.filter(df(x).isNull || df(x) === "" || df(x).isNan).count))

I'm trying to use an val counter = sc.longAccumulator instead a dataframe count function, without success.

The attempts I've made have been:

df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1)} (x, counter.value)})
df.columns.foreach(x => {df.filter(df(x).isNull || df(x) === "" || df(x).isNaN) {counter.add(1); (x, counter.value)} })

Unfortunately none of these work because it doesn't return the correct type (Array[(String, Long)]).

Does anyone have any ideas or suggestions? Thanks in advance

P.s. I don't know if using the accumulator is more efficient than the count, but I would just like to try.

Edit: Should I use a foreach instead of a map to not have a wrong value in the accumulator? Since the map is a transformation, while foreach is an action

Edit2: As suggested by @DNA I changed the map to foreach inside my code.

Edit3: Ok, now the problem has become trying to create an Array[(String, Long)]. I tried this, but the :+ operator doesn't work.

val counter = session.sparkContext.longAccumulator
val res: Array[(String, Long)] = Array()
df.columns
    .foreach(x => res :+ (x, df.filter{ df(x).isNull || df(x) === "" || df(x).isNaN {counter.add(1); counter.value}}))

Does anyone have any ideas or suggestions?

1 Answer 1

2

The documentation discusses this topic:

Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like map(). The below code fragment demonstrates this property:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

There is an additional problem with getting reliable results from accumulators:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

So for both of these reasons, one should favour actions such as foreach over transformations such as map if using an accumulator like this.

Also, note that you are running foreach over the array of columns, not on the DataFrame itself - then you are running the filter transformation repeatedly on your DataFrame. So in this case, foreach isn't a Spark action at all, it's just a method on Array.

So you probably need a map over the df.columns array (so you get an array to return from your function), then a foreach action over the actual DataFrame (to perform the counting).

Here's one way of doing it:

df.columns.map(col => {
  val acc = sc.accumulator(0)
  df.foreach(row => {
    val v = row.getAs[Any](col)
    if (v == null || v == "") acc += 1  // NaN left as an exercise
    }
  )
  (col, acc.value)
})

But note that this is always going to be inefficient because we have to make a pass over the DataFrame for each column. It would probably be more efficient to count all the columns in a single pass (generating a tuple or Map of counts for each row), then merge the counts using reduce or fold or similar, rather than using counters.

Sign up to request clarification or add additional context in comments.

6 Comments

Yes in fact I thought of using foreach instead of map. But I can't return the correct type. Do you have any idea how to do it?
The function required for foreach doesn't need to return anything; the signature is def foreach(func: ForeachFunction[T]): Unit so it's simpler than map()
Sorry, I mean def foreach(f: (T) ⇒ Unit): Unit
Right! Only that I need to have countNullValueColumn return Array[(String,Long)] where Long is the type of counter.value
Try to see if the 4th edit is what you meant in your previous comment, please @DNA
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.