-2

I have a dataframe with Schema :

root
 |-- QUERY: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- DEVICE: string (nullable = true)
 |-- PURCHASE_UNITS_SUM: double (nullable = true)
 |-- CLICK_SUM: decimal(38,18) (nullable = true)
 |-- IMPRESSION_COUNT: long (nullable = false)
 |-- CLICK_THROUGH_RATE: decimal(38,2) (nullable = true)
 |-- PURCHASE_RATE: double (nullable = true)

I am trying to convert some columns to map (device -> columns) :

val result = df.withColumn("CLICK_THROUGH_RATE_MAP",
        map(col("DEVICE"), col("CLICK_THROUGH_RATE")))
      .withColumn("PURCHASE_RATE_MAP",
        map(col("DEVICE"), col("PURCHASE_RATE")))
      .withColumn("PURCHASE_SUM_MAP",
        map(col("DEVICE"), col("PURCHASE_UNITS_SUM")))
      .withColumn("CLICK_SUM_MAP",
        map(col("DEVICE"), col("CLICK_SUM")))
      .withColumn("IMPRESSION_SUM_MAP",
        map(col("DEVICE"), col("IMPRESSION_COUNT")))
      .groupBy("QUERY", "TYPE")
      .agg(collect_list("CLICK_THROUGH_RATE_MAP"),
        collect_list("PURCHASE_RATE_MAP"),
          collect_list("PURCHASE_SUM_MAP"),
          collect_list("CLICK_SUM_MAP"),
          collect_list("IMPRESSION_SUM_MAP"))
      .as[(String, String,
        Seq[Map[String, Double]],
        Seq[Map[String, Double]],
        Seq[Map[String, Double]],
        Seq[Map[String, Double]],
        Seq[Map[String, Double]])]
      .map {
        case (query, type, list1, list2, list3, list4, list5) =>
          (query, type,
            list1.reduce(_ ++ _),
            list2.reduce(_ ++ _),
            list3.reduce(_ ++ _),
            list4.reduce(_ ++ _),
            list5.reduce(_ ++ _))
      }.
      toDF("QUERY",
        "TYPE",
        "CLICK_THROUGH_RATE",
        "PURCHASE_RATE",
        "PURCHASE_UNITS",
        "CLICKS",
        "IMPRESSIONS")
  } 

This gives me -

root
 |-- QUERY: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- CLICK_THROUGH_RATE: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- PURCHASE_RATE: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- PURCHASE_UNITS: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- CLICKS: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- IMPRESSIONS: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

But when I do result.count, I am getting this exception -

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 4 times, most recent failure: Lost task 0.3 in stage 63.0 (TID 62365, ip-10-0-1-52.ec2.internal, executor 2): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
    at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
    at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2347)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
    at sun.reflect.GeneratedMethodAccessor232.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:401)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:730)
  ... 53 elided
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
  at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
  at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2347)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:490)
  at sun.reflect.GeneratedMethodAccessor232.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
  at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
  at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
  at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  ... 3 more

Am I doing something wrong ?

1
  • This seems to be the same error you have in this question: stackoverflow.com/questions/60750980/…? As I commented over there, I would suggest to check your dependencies. Commented Mar 19, 2020 at 5:53

2 Answers 2

1

There is the same problem with HashMap.

I found the solution here : https://gist.github.com/ramn/5566596

You have to replace the class ObjectInputStream in your code by a new class : ObjectInputStreamWithCustomClassLoader

    class ObjectInputStreamWithCustomClassLoader(
      fileInputStream: FileInputStream
    ) extends ObjectInputStream(fileInputStream) {
      override def resolveClass(desc: java.io.ObjectStreamClass): Class[_] = {
        try { Class.forName(desc.getName, false, getClass.getClassLoader) }
        catch { case ex: ClassNotFoundException => super.resolveClass(desc) }
      }
    }
Sign up to request clarification or add additional context in comments.

Comments

-1

I changed your code a little bit and I am getting the result

Created a dataframe with a single record with same schema as yours

val df = Seq(("select * from test", "type1", "device1", "10.0", "20.0", "1234", "23.4567", "10.98")).toDF.selectExpr("_1 as QUERY", "_2 as TYPE", "_3 as DEVICE", "_4 as PURCHASE_UNITS_SUM", "_5 as CLICK_SUM", "_6 as IMPRESSION_COUNT", "_7 as CLICK_THROUGH_RATE", "_8 as PURCHASE_RATE")

Below is the Schema and the sample row

root
 |-- QUERY: string (nullable = true)
 |-- TYPE: string (nullable = true)
 |-- DEVICE: string (nullable = true)
 |-- PURCHASE_UNITS_SUM: string (nullable = true)
 |-- CLICK_SUM: string (nullable = true)
 |-- IMPRESSION_COUNT: string (nullable = true)
 |-- CLICK_THROUGH_RATE: string (nullable = true)
 |-- PURCHASE_RATE: string (nullable = true)

+------------------+-----+-------+------------------+---------+----------------+------------------+-------------+
|             QUERY| TYPE| DEVICE|PURCHASE_UNITS_SUM|CLICK_SUM|IMPRESSION_COUNT|CLICK_THROUGH_RATE|PURCHASE_RATE|
+------------------+-----+-------+------------------+---------+----------------+------------------+-------------+
|select * from test|type1|device1|              10.0|     20.0|            1234|           23.4567|        10.98|
+------------------+-----+-------+------------------+---------+----------------+------------------+-------------+
val result = df.withColumn("CLICK_THROUGH_RATE_MAP", map(col("DEVICE"), col("CLICK_THROUGH_RATE"))).
      withColumn("PURCHASE_RATE_MAP", map(col("DEVICE"), col("PURCHASE_RATE"))).
      withColumn("PURCHASE_SUM_MAP", map(col("DEVICE"), col("PURCHASE_UNITS_SUM"))).
      withColumn("CLICK_SUM_MAP", map(col("DEVICE"), col("CLICK_SUM"))).
      withColumn("IMPRESSION_SUM_MAP", map(col("DEVICE"), col("IMPRESSION_COUNT"))).
      groupBy("QUERY", "TYPE").
      agg(collect_list("CLICK_THROUGH_RATE_MAP"), collect_list("PURCHASE_RATE_MAP"), collect_list("PURCHASE_SUM_MAP"), collect_list("CLICK_SUM_MAP"), collect_list("IMPRESSION_SUM_MAP")).
      as[(String, String, Seq[scala.collection.immutable.Map[String, Double]], Seq[scala.collection.immutable.Map[String, Double]], Seq[scala.collection.immutable.Map[String, Double]], Seq[scala.collection.immutable.Map[String, Double]], Seq[scala.collection.immutable.Map[String, Double]])]

result.show

+------------------+-----+------------------------------------+-------------------------------+------------------------------+---------------------------+--------------------------------+
|             QUERY| TYPE|collect_list(CLICK_THROUGH_RATE_MAP)|collect_list(PURCHASE_RATE_MAP)|collect_list(PURCHASE_SUM_MAP)|collect_list(CLICK_SUM_MAP)|collect_list(IMPRESSION_SUM_MAP)|
+------------------+-----+------------------------------------+-------------------------------+------------------------------+---------------------------+--------------------------------+
|select * from test|type1|                [Map(device1 -> 2...|           [Map(device1 -> 1...|          [Map(device1 -> 1...|       [Map(device1 -> 2...|            [Map(device1 -> 1...|
+------------------+-----+------------------------------------+-------------------------------+------------------------------+---------------------------+--------------------------------+

I changed the map function as follows

val finalresultdf = result.map { f => (f._1, f._2, f._3.reduce(_ ++ _), f._4.reduce(_ ++ _), f._5.reduce(_ ++ _), f._6.reduce(_ ++ _), f._7.reduce(_ ++ _)) }.
      toDF("QUERY", "TYPE", "CLICK_THROUGH_RATE", "PURCHASE_RATE", "PURCHASE_UNITS", "CLICKS", "IMPRESSIONS")

finalresultdf.show

+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|             QUERY| TYPE|  CLICK_THROUGH_RATE|       PURCHASE_RATE|      PURCHASE_UNITS|              CLICKS|         IMPRESSIONS|
+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|select * from test|type1|Map(device1 -> 23...|Map(device1 -> 10...|Map(device1 -> 10.0)|Map(device1 -> 20.0)|Map(device1 -> 12...|
+------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+

finalresultdf.count

scala> finalresultdf.count
res34: Long = 1

Hope this helps!!!

1 Comment

I still get - cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD I think its something with dependency

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.