2

I have data in an input text file. It contains the input data in format: "PriceId, DateTime, PriceForUSA, PriceForUK, PriceForAUS".

It looks like this:

0000002,11-05-08-2016,0.92,1.68,0.81

0000003,07-05-08-2016,0.80,1.05,1.49

0000008,07-05-08-2016,1.43,1.29,1.22

The list of countries is fixed (USA, UK, AUS), and the order of prices in lines is fixed too (PriceForUSA, PriceForUK, PriceForAUS).

I read this data from file using Spark Context and transform it to RDD[List[String[]. Every List in my RDD represents one line from the input text file.

For example,

first List contains Strings

"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"

second list contains Strings

"0000003", "07-05-08-2016" , "0.80", "1.05" , "1.49"

etc.

I also have the custom class PriceInfo

case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {

  override def toString: String = s"$priceId,$priceDate,$country,$price"
}

It is not difficult to transform every List[String] to this class' object, ( I can do it already), but in this case my task is to get several custom objects from every single List[String].

For example, List which contains

"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"

should be transformed to:

  • PriceInfo("0000002", "11-05-08-2016", "USA", "0.92")
  • PriceInfo("0000002", "11-05-08-2016", "UK", "1.68")
  • PriceInfo("0000002", "11-05-08-2016", "AUS", "0.81").

And every List[String] in my RDD[List[String]] must be "splitted" to several PriceInfo objects in the same way.

The result should be a RDD[PriceInfo].

The only solution that came to my mind is to iterate RDD[List[String]] with foreach() function, create 3 PriceInfo objects in every iteration, then add all created objects in List[PriceObjects] and use this result-List in SparkContext.parallelize(List...).

Something like this:

rawPricesList.foreach(list => {

      //...create PriceInfo1 from list
      //...create PriceInfo2 from list
      //...create PriceInfo3 from list

      //...add them all to result List<PriceInfo>

    })

    //...sc.parallelize(List<PriceInfo>...)

But such a solution has many shortcomings.

The main thing is that it will not work if we have no link to SparkContext. For example, if we will have a method getPrices() which will have only 1 parameter - RDD[List[String]].

def getPrices(rawPricesList: RDD[List[String]]): RDD[PriceInfo] = {



    rawPricesList.foreach(list => {

      //...create PriceInfo1 from list
      //...create PriceInfo2 from list
      //...create PriceInfo3 from list

      //...add them all to result List<PriceInfo>

    })

    //...but we can't sc.parallelize(List...) here, because there is no SparkContext sc in method parameters
  }

In addition, it seems to me that Scala contains a more elegant solution.

I tried to find similar samples in books "Scala for impatient" and "Learning Spark: Lightning-Fast Big Data Analysis", but unfortunately did not find anything like this case. I will be very grateful for the help and tips.

1 Answer 1

2

Here's one approach:

  1. Load the text file and split each line into an Array[String] of (id, date, price1, price2, price3)
  2. Transform each row into (id, date, Array[(country, numericPrice)]) using zip
  3. Flatten the (country, numericPrice) tuples in each row into rows of PriceInfo objects using flatMap

Example code below:

case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
  override def toString: String = s"$priceId,$priceDate,$country,$price"
}

val countryList = List("USA", "UK", "AUS")

val rdd = sc.textFile("/path/to/textfile").
  map( _.split(",") ).
  map{ case Array(id, date, p1, p2, p3) =>
    (id, date, countryList.zip(List(p1.toDouble, p2.toDouble, p3.toDouble)))
  }.
  flatMap{ case (id, date, countryPrices) =>
    countryPrices.map( cp => PriceInfo(id, date, cp._1, cp._2) ) 
  }
// rdd: org.apache.spark.rdd.RDD[PriceInfo] = ...

rdd.collect
// res1: Array[PriceInfo] = Array(
//    0000002,11-05-08-2016,USA,0.92,
//    0000002,11-05-08-2016,UK,1.68,
//    0000002,11-05-08-2016,AUS,0.81,
//    0000003,07-05-08-2016,USA,0.8,
//    0000003,07-05-08-2016,UK,1.05,
//    0000003,07-05-08-2016,AUS,1.49,
//    0000008,07-05-08-2016,USA,1.43,
//    0000008,07-05-08-2016,UK,1.29,
//    0000008,07-05-08-2016,AUS,1.22
// )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.