I have a data of device IDs with startTime and some feature vectors, which needs to be merged based on hour or weekday_hour. The sample data is as follows:
+-----+-------------------+--------------------+
|hh_id| startTime| hash|
+-----+-------------------+--------------------+
|dev01|2016-10-10 00:01:04|(1048576,[121964,...|
|dev02|2016-10-10 00:17:45|(1048576,[121964,...|
|dev01|2016-10-10 00:18:01|(1048576,[121964,...|
|dev10|2016-10-10 00:19:48|(1048576,[121964,...|
|dev05|2016-10-10 00:20:00|(1048576,[121964,...|
|dev08|2016-10-10 00:45:13|(1048576,[121964,...|
|dev05|2016-10-10 00:56:25|(1048576,[121964,...|
The features are basically SparseVectors, which are merged by a custom function. When I try to create a key column in the following way:
val columnMap = Map("hour" -> hour($"startTime"), "weekday_hour" -> getWeekdayHourUDF($"startTime"))
val grouping = "hour"
val newDF = oldDF.withColumn("dt_key", columnMap(grouping))
I get a java.io.NotSerializableException. The complete stack trace is below:
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: hour(startTime))
- field (class: scala.collection.immutable.Map$Map3, name: value1, type: class java.lang.Object)
- object (class scala.collection.immutable.Map$Map3, Map(hour -> hour(startTime), weekday_hour -> UDF(startTime), none -> 0))
- field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: groupingColumnMap, type: interface scala.collection.immutable.Map)
- object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@4f1f9a63)
- field (class: linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, name: $iw, type: class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw)
- object (class linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw, linef03f4aaf3a1c4f109fce271f7b5b1e30104.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@207d6d1e)
But when I try to execute the same logic without creating columns explicitly, using if-else, I don't face any such errors.
val newDF = if(groupingKey == "hour") {
oldDF.withColumn("dt_key", hour($"startTime")
} else {
oldDF.withColumn("dt_key", getWeekdayHourUDF($"startTime")
}
It will be really convinient to do it using the Map-way, as there might be more type of key extraction methods. Please help me in figuring out why this issue is being caused.
columnMap? it should beMap[String, Column].columnMapisscala.collection.immutable.Map[String,org.apache.spark.sql.Column]. I don't think there is a problem with the UDF, but here is code for it.val localDateTime = ts.toLocalDateTimeand then(localDateTime.getDayOfWeek.getValue - 1)*24 + localDateTime.getHourval getWeekdayHourUDF = udf(getWeekdayHour _)