0

I am trying to transform the input text file into a Key/Value RDD, but the code below doesn't work.(The text file is a tab separated file.) I am really new to Scala and Spark so I would really appreciate your help.

import org.apache.spark.{SparkConf, SparkContext}
import scala.io.Source

object shortTwitter {

  def main(args: Array[String]): Unit = {
    for (line <- Source.fromFile(args(1).txt).getLines()) {
      val newLine = line.map(line =>
      val p = line.split("\t")
      (p(0).toString, p(1).toInt)
      )
}

    val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val text = sc.textFile(args(0))
    val counts = text.flatMap(line => line.split("\t"))
  }
}
1
  • "code below doesn't work" - how so? What's the failure? Commented Sep 18, 2016 at 10:50

1 Answer 1

1

I'm assuming you want the resulting RDD to have the type RDD[(String, Int)], so -

  • You should use map (which transforms each record into a single new record) and not flatMap (which transform each record into multiple records)
  • You should map the result of the split into a tuple

Altogether:

val counts = text
  .map(line => line.split("\t"))
  .map(arr => (arr(0), arr(1).toInt))

EDIT per clarification in comment: if you're also interested in fixing the non-Spark part (which reads the file sequentially), you have some errors in the for-comprehension syntax, here's the entire thing:

def main(args: Array[String]): Unit = {
  // read the file without Spark (not necessary when using Spark):
  val countsWithoutSpark: Iterator[(String, Int)] = for {
    line <- Source.fromFile(args(1)).getLines()
  } yield {
    val p = line.split("\t")
    (p(0), p(1).toInt)
  }

  // equivalent code using Spark:
  val sparkConf = new SparkConf().setAppName("ShortTwitterAnalysis").setMaster("local[2]")
  val sc = new SparkContext(sparkConf)
  val counts: RDD[(String, Int)] = sc.textFile(args(0))
    .map(line => line.split("\t"))
    .map(arr => (arr(0), arr(1).toInt))
}
Sign up to request clarification or add additional context in comments.

4 Comments

Besides that, the error says 'illegal start of simple expression, val p = line.split("\t")'. Because of this error I cannot run the code at all
Oh I see, I thought that code was just meant to clarify what you're trying to do with Spark, I see no sense in using both parts - one reads the file sequentially and one uses Spark to do the same thing... Anyway - see updated answer.
Thank you! The second part was the code to do something else. Sorry for the confusion lol.. and one more question! How can I check the result of codes without spark using 'print' ?
countsWithoutSpark.foreach(println)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.