6

I am using the new Apache Spark version 1.4.0 Data-frames API to extract information from Twitter's Status JSON, mostly focused on the Entities Object - the relevant part to this question is showed below:

{
  ...
  ...
  "entities": {
    "hashtags": [],
    "trends": [],
    "urls": [],
    "user_mentions": [
      {
        "screen_name": "linobocchini",
        "name": "Lino Bocchini",
        "id": 187356243,
        "id_str": "187356243",
        "indices": [ 3, 16 ]
      },
      {
        "screen_name": "jeanwyllys_real",
        "name": "Jean Wyllys",
        "id": 111123176,
        "id_str": "111123176",
        "indices": [ 79, 95 ]
      }
    ],
    "symbols": []
  },
  ...
  ...
}

There are several examples on how extract information from primitives types as string, integer, etc - but I couldn't find anything on how to process those kind of complex structures.

I tried the code below but it is still doesn't work, it throws an Exception

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

val tweets = sqlContext.read.json("tweets.json")

// this function is just to filter empty entities.user_mentions[] nodes
// some tweets doesn't contains any mentions
import org.apache.spark.sql.functions.udf
val isEmpty = udf((value: List[Any]) => value.isEmpty)

import org.apache.spark.sql._
import sqlContext.implicits._
case class UserMention(id: Long, idStr: String, indices: Array[Long], name: String, screenName: String)

val mentions = tweets.select("entities.user_mentions").
  filter(!isEmpty($"user_mentions")).
  explode($"user_mentions") {
  case Row(arr: Array[Row]) => arr.map { elem =>
    UserMention(
      elem.getAs[Long]("id"),
      elem.getAs[String]("is_str"),
      elem.getAs[Array[Long]]("indices"),
      elem.getAs[String]("name"),
      elem.getAs[String]("screen_name"))
  }
}

mentions.first

Exception when I try to call mentions.first:

scala>     mentions.first
15/06/23 22:15:06 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
scala.MatchError: [List([187356243,187356243,List(3, 16),Lino Bocchini,linobocchini], [111123176,111123176,List(79, 95),Jean Wyllys,jeanwyllys_real])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:55)
    at org.apache.spark.sql.catalyst.expressions.UserDefinedGenerator.eval(generators.scala:81)

What is wrong here? I understand it is related to the types but I couldn't figure out it yet.

As additional context, the structure mapped automatically is:

scala> mentions.printSchema
root
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)

NOTE 1: I know it is possible to solve this using HiveQL but I would like to use Data-frames once there is so much momentum around it.

SELECT explode(entities.user_mentions) as mentions
FROM tweets

NOTE 2: the UDF val isEmpty = udf((value: List[Any]) => value.isEmpty) is a ugly hack and I'm missing something here, but was the only way I came up to avoid a NPE

2
  • I think your case Row(arr: Array[Row]) does not match your input. Commented Jun 24, 2015 at 15:06
  • Hi @elmalto, I tried both List and Array but either way I get the same error. Commented Jun 25, 2015 at 13:02

2 Answers 2

4

here is a solution that works, with just one small hack.

The main idea is to work around the type problem by declaring a List[String] rather than List[Row]:

val mentions = tweets.explode("entities.user_mentions", "mention"){m: List[String] => m}

This creates a second column called "mention" of type "Struct":

|            entities|             mention| 
+--------------------+--------------------+ 
|[List(),List(),Li...|[187356243,187356...| 
|[List(),List(),Li...|[111123176,111123...| 

Now do a map() to extract the fields inside mention. The getStruct(1) call gets the value in column 1 of each row:

case class Mention(id: Long, id_str: String, indices: Seq[Int], name: String, screen_name: String)
val mentionsRdd = mentions.map(
  row => 
    {  
      val mention = row.getStruct(1)
      Mention(mention.getLong(0), mention.getString(1), mention.getSeq[Int](2), mention.getString(3), mention.getString(4))
    }
)

And convert the RDD back into a DataFrame:

val mentionsDf = mentionsRdd.toDF()

There you go!

|       id|   id_str|     indices|         name|    screen_name|
+---------+---------+------------+-------------+---------------+
|187356243|187356243| List(3, 16)|Lino Bocchini|   linobocchini|
|111123176|111123176|List(79, 95)|  Jean Wyllys|jeanwyllys_real|
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks Xinh Huynh, my concern with this hack, is that I'm going through all the dataset doing Row.toString() prior to extract the elements, I don't have specific benchmarks, but seems we would waste a lot of machine time to do this step. This is the only reason I don't consider your question the right one!
-1

Try doing this:

case Row(arr: Seq[Row]) => arr.map { elem =>

1 Comment

Please add some comments about your solution on why and how it solves the problem

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.