2

I have a data of device IDs with startTime and some feature vectors, which needs to be merged based on hour or weekday_hour. The sample data is as follows:

+-----+-------------------+--------------------+
|hh_id|          startTime|                hash|
+-----+-------------------+--------------------+
|dev01|2016-10-10 00:01:04|(1048576,[121964,...|
|dev02|2016-10-10 00:17:45|(1048576,[121964,...|
|dev01|2016-10-10 00:18:01|(1048576,[121964,...|
|dev10|2016-10-10 00:19:48|(1048576,[121964,...|
|dev05|2016-10-10 00:20:00|(1048576,[121964,...|
|dev08|2016-10-10 00:45:13|(1048576,[121964,...|
|dev05|2016-10-10 00:56:25|(1048576,[121964,...|

The features are basically SparseVectors, which are merged by a custom function. When I try to create a key column in the following way:

val columnMap = Map("hour" -> hour($"startTime"), "weekday_hour" -> getWeekdayHourUDF($"startTime"))
val grouping = "hour"
val newDF = oldDF.withColumn("dt_key", columnMap(grouping))

I get a java.io.NotSerializableException. The complete stack trace is below:

Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.Column, value: hour(startTime))
    - field (class: scala.collection.immutable.Map$Map3, name: value1, type: class java.lang.Object)
    - object (class scala.collection.immutable.Map$Map3, Map(hour -> hour(startTime), weekday_hour -> UDF(startTime), none -> 0))
    - field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: groupingColumnMap, type: interface scala.collection.immutable.Map)
    - object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4f1f9a63)
    - field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
    - object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@207d6d1e)

But when I try to execute the same logic without creating columns explicitly, using if-else, I don't face any such errors.

val newDF = if(groupingKey == "hour") {
  oldDF.withColumn("dt_key", hour($"startTime")
} else {
  oldDF.withColumn("dt_key", getWeekdayHourUDF($"startTime")
}

It will be really convinient to do it using the Map-way, as there might be more type of key extraction methods. Please help me in figuring out why this issue is being caused.

4
  • you should write a udf function to create the maps Commented Mar 15, 2018 at 6:37
  • @RameshMaharjan You mean, I need to create a UDF which contains a map of the functions to be applied rather than creating a map of UDFs? Commented Mar 15, 2018 at 6:41
  • I'm not able to reproduce this in Spark 1.6 or Spark 2.2. Are you sure theres no problem with the UDF? What is the type of columnMap? it should be Map[String, Column]. Commented Mar 15, 2018 at 6:52
  • @philantrovert Yes, the type of the columnMap is scala.collection.immutable.Map[String,org.apache.spark.sql.Column]. I don't think there is a problem with the UDF, but here is code for it. val localDateTime = ts.toLocalDateTime and then (localDateTime.getDayOfWeek.getValue - 1)*24 + localDateTime.getHour val getWeekdayHourUDF = udf(getWeekdayHour _) Commented Mar 15, 2018 at 7:01

2 Answers 2

4

Maybe a little late, but I am at Spark 2.4.6 and couldn't reproduce the issue. I'm guessing the code calls columnMap for multiple keys. It helps if you provide an easily reproducible example, including data (1-row dataset is enough). However, as the stack trace says, the Column class is indeed not Serializable, and I'll try to elaborate according to my current understanding.

TLDR; One easy way to circumvent that is to turn vals into defs.


I believe it's already clear why expressing the same thing with when cases or UDFs works.

First attempt: The reason why something like that may not work is because (a) the Column class is not serializable (which I believe to be a conscious design choice given its intended role within the Spark API), and (b) there is nothing in the expression

oldDF.withColumn("dt_key", columnMap(grouping))

that tells Spark what will be the actual concrete Column for the second parameter of withColumn, which means that the concrete Map[String, Column] object will need to be sent over the network to executors, when an exception like that would be raised.

Second attempt: The reason why the second attempt works is because the same decision regarding this groupingKey parameter required to define the DataFrame can happen entirely on the driver.


It helps to think about Spark code using the DataFrame API as a query builder, or something holding an execution plan, and not the data itself. Once you call an action on it (write, show, count etc), Spark generates code that sends tasks to executors. At that moment, all information needed to materialize the DataFrame/Dataset must be either already properly encoded in the query plan or needs to be serializable so that it can be sent over the network.

def usually solves this kind of issue because

def columnMap: Map[String, Column] = Map("a" -> hour($"startTime"), "weekday_hour" -> UDF($"startTime"))

is not the concrete Map object itself but something that creates a new Map[String, Column] every time it is called, say at each executor that happens to take a task that involving this Map.

This and this seem like good resources on the topic. I confess I get why using a Function like

val columnMap = () => Map("a" -> hour($"startTime"), "b" -> UDF($"startTime"))

and then columnMap()("a") would work, since the de-compiled byte code shows that scala.Functions are defined as concrete instances of Serializable, but I do not get why def works since that doesn't look to be the case for them. Anyways, I hope this helps.

Sign up to request clarification or add additional context in comments.

Comments

1

when inbuilt function

You can achieve your requirement by using a when inbuilt function as

val groupingKey = //"hour" or "weekday_hour"
import org.apache.spark.sql.functions._
df.withColumn("dt_key", 
     when(lit(groupingKey) === "hour", hour($"startTime"))
     .when(lit(groupingKey) === "weekday_hour", getWeekdayHourUDF($"startTime"))
     .otherwise(lit(0)))).show(false)

udf function

Alternatively you can create a udf function to create the map column as

import org.apache.spark.sql.functions._
def mapUdf = udf((hour: Int, weekdayhour: Int, groupingKey: String) => 
      if(groupByKey.equalsIgnoreCase("hour")) hour 
      else if(groupByKey.equalsIgnoreCase("weekday_hour")) weekdayhour 
      else 0)

And use it as

val newDF = oldDF.withColumn("dt_key",
                  mapUdf(hour($"startTime"), 
                         getWeekdayHourUDF($"startTime"),
                         lit(groupingKey)))

I hope the answer is helpful

6 Comments

I think there is a misunderstanding regarding the question. The dataframe has the startTime column and the dt_key column is dervied conditionally on the value of groupingKey whose value can be hour or weekday_hour. Finally, I don't need a map of columns, I just need a single column as the result.
what is groupingKey then? is it another column?
No, it's just a String variable with values "hour" or "weekday_hour". It defines the type of grouping to be done on the whole data. The other function hour that I'm referring to is the spark.sql.functions.hour, which extracts the hour from the timestamp.
Ah! I see. You've transferred the if-else inside the mapUDF. But the problem is, I can't use the native functions provided by Spark in this method. :( I'm hoping there is some other easier way to do this. I tried using transient keyword for the columnMap, but even that doesn't work.
I have updated the answer again :) Hope this time I answered it as you wanted
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.