0

I've got a hadoopFiles object which is generated from sc.newAPIHadoopFile.

scala> hadoopFiles
res1: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = UnionRDD[64] at union at <console>:24

I intend to iterate through all the lines in hadoopFiles with operation and filter on it, In which, a if check is applied and will throw an exception:

scala> val rowRDD = hadoopFiles.map(line =>
     |           line._2.toString.split("\\^") map {
     |             field => {
     |               var pair = field.split("=", 2)
     |               if(pair.length == 2)
     |                 (pair(0) -> pair(1))
     |             }
     |           } toMap
     |         ).map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))
<console>:33: error: Cannot prove that Any <:< (T, U).
                 } toMap
                   ^

However, if I remove the if(pair.length == 2) part, it will works fine:

scala>     val rowRDD = hadoopFiles.map(line =>
     |           line._2.toString.split("\\^") map {
     |             field => {
     |               var pair = field.split("=", 2)
     |               (pair(0) -> pair(1))
     |             }
     |           } toMap
     |         ).map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))
warning: there was one feature warning; re-run with -feature for details
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.expressions.Row] = MappedRDD[66] at map at <console>:33

Could anyone tell me the reason for this phenomenon, and show me the correct way to apply the if statement. Thanks a lot!

P.S. We could use this simplified example to test:

"1=a^2=b^3".split("\\^") map {
            field => {
              var pair = field.split("=", 2)
              if(pair.length == 2)
                pair(0) -> pair(1)
              else
                return
            }
          } toMap
2
  • What if pair.length != 2. You have provide something for that case too. Commented Feb 4, 2015 at 1:14
  • Just filter it out. I only look for key-value pairs which is exactly composed of 2 parts. @SarveshKumarSingh Commented Feb 4, 2015 at 1:21

2 Answers 2

3

To map over a collection and only keep some of the mapped elements, you can use flatMap. flatMap takes a function that returns a collection, e.g. instance an Option. Now the if expression needs to have an else part that returns an empty Option, i.e. None.

scala> val rowRDD = hadoopFiles.map(line =>
     |           line._2.toString.split("\\^") flatMap {
     |             field => {
     |               var pair = field.split("=", 2)
     |               if (pair.length == 2)
     |                 Some(pair(0) -> pair(1))
     |               else
     |                 None
     |             }
     |           } toMap
     |         ).map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))
Sign up to request clarification or add additional context in comments.

4 Comments

That is to say, if my line looks like '1=a^2=b^3', the above flatMap function will just return (1->a) and (2->b) ? I'm new to scala as well as Spark, what does the Some and None keyword serves here interacting with flatMap? Why not the map function?
Well, it throws another error: error: value flatmap is not a member of Array[String]
Some and None aren't keywords. They are subclasses of Option, which is a container that can have zero elements(None) or one element (Some).
When revising to flatMap, it is just what I expected. Thanks for your detailed explanation :]
1

You can use collect:

val res = "1=a^2=b^3".split("\\^") collect {
  _.split("=", 2) match {
    case Array(a, b) => a -> b
  }
} toMap

println(res) // Map(1 -> a, 2 -> b)

In your particular case the following happens:

case class Row(uuid: String, ip: String, plt: String)
val hadoopFiles = List(("", "uuid=a^ip=b^plt"))

val rowRDD = hadoopFiles.map(line =>
  line._2.toString.split("\\^") map {
    field =>
      {
        var pair = field.split("=", 2)
        val res = if (pair.length == 2)
          (pair(0) -> pair(1))
        res  // res: Any (common super class for (String, String)
             // which is Tuple2 and Unit (result for case when 
             // pair.length != 2)
      }
  } /* <<< returns Array[Any] */ /*toMap*/ ) 
  //.map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))

The result of inner map is Any and map yields Array[Any]. If you look at toMap definition you will see:

  def toMap[T, U](implicit ev: A <:< (T, U)): immutable.Map[T, U] = {
    val b = immutable.Map.newBuilder[T, U]
    for (x <- self)
      b += x // <<< implicit conversion from each `x` of class `A` in `self`
             // to (T, U) because we have `implicit ev: A <:< (T, U)`
    b.result()
  }

For your Array[Any] there is no implicit conversion from Any to (T, U) in current context. Because of this your code fails. If you add else alternative:

  val rowRDD = hadoopFiles.map(line =>
    (line._2.toString.split("\\^") map {
      field =>
        {
          var pair = field.split("=", 2)
          val res = if (pair.length == 2)
            (pair(0) -> pair(1))
          else ("" -> "") // dummy, just for demo
          res // res: (String, String)
        }
    } toMap) withDefaultValue ("") 
           /*withDefaultValue just to avoid Exception for this demo*/ )
    .map(kvs => Row(kvs("uuid"), kvs("ip"), kvs("plt").trim))

  println(rowRDD) // List(Row(a,b,))

Here your result will be Array[(String, String)] and there is an implicit conversion from (String, String) to (T, U). So the code compiles and works.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.