I have data in an input text file. It contains the input data in format: "PriceId, DateTime, PriceForUSA, PriceForUK, PriceForAUS".
It looks like this:
0000002,11-05-08-2016,0.92,1.68,0.81
0000003,07-05-08-2016,0.80,1.05,1.49
0000008,07-05-08-2016,1.43,1.29,1.22
The list of countries is fixed (USA, UK, AUS), and the order of prices in lines is fixed too (PriceForUSA, PriceForUK, PriceForAUS).
I read this data from file using Spark Context and transform it to RDD[List[String[]. Every List in my RDD represents one line from the input text file.
For example,
first List contains Strings
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
second list contains Strings
"0000003", "07-05-08-2016" , "0.80", "1.05" , "1.49"
etc.
I also have the custom class PriceInfo
case class PriceInfo(priceId: String, priceDate: String, country: String, price: Double) {
override def toString: String = s"$priceId,$priceDate,$country,$price"
}
It is not difficult to transform every List[String] to this class' object, ( I can do it already), but in this case my task is to get several custom objects from every single List[String].
For example, List which contains
"0000002", "11-05-08-2016", "0.92", "1.68", "0.81"
should be transformed to:
- PriceInfo("0000002", "11-05-08-2016", "USA", "0.92")
- PriceInfo("0000002", "11-05-08-2016", "UK", "1.68")
- PriceInfo("0000002", "11-05-08-2016", "AUS", "0.81").
And every List[String] in my RDD[List[String]] must be "splitted" to several PriceInfo objects in the same way.
The result should be a RDD[PriceInfo].
The only solution that came to my mind is to iterate RDD[List[String]] with foreach() function, create 3 PriceInfo objects in every iteration, then add all created objects in List[PriceObjects] and use this result-List in SparkContext.parallelize(List...).
Something like this:
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...sc.parallelize(List<PriceInfo>...)
But such a solution has many shortcomings.
The main thing is that it will not work if we have no link to SparkContext. For example, if we will have a method getPrices() which will have only 1 parameter - RDD[List[String]].
def getPrices(rawPricesList: RDD[List[String]]): RDD[PriceInfo] = {
rawPricesList.foreach(list => {
//...create PriceInfo1 from list
//...create PriceInfo2 from list
//...create PriceInfo3 from list
//...add them all to result List<PriceInfo>
})
//...but we can't sc.parallelize(List...) here, because there is no SparkContext sc in method parameters
}
In addition, it seems to me that Scala contains a more elegant solution.
I tried to find similar samples in books "Scala for impatient" and "Learning Spark: Lightning-Fast Big Data Analysis", but unfortunately did not find anything like this case. I will be very grateful for the help and tips.