3

I am a newbie with Apache spark as well with Scala programming language.

What I am trying to achieve is to extract the data from my local mongoDB database for then to save it in a parquet format using Apache Spark with the hadoop-connector

This is my code so far:

package com.examples 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.rdd.RDD 
import org.apache.hadoop.conf.Configuration 
import org.bson.BSONObject 
import com.mongodb.hadoop.{MongoInputFormat, BSONFileInputFormat} 
import org.apache.spark.sql 
import org.apache.spark.sql.SQLContext 

object DataMigrator { 

    def main(args: Array[String])
    { 
        val conf = new SparkConf().setAppName("Migration    App").setMaster("local") 
        val sc = new SparkContext(conf) 
        val sqlContext = new SQLContext(sc) 

        // Import statement to implicitly convert an RDD to a DataFrame 
        import sqlContext.implicits._ 

        val mongoConfig = new Configuration() 
        mongoConfig.set("mongo.input.uri",   "mongodb://localhost:27017/mongosails4.case") 

        val mongoRDD = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]);     

        val count = countsRDD.count()

        // the count value is aprox 100,000 
        println("================ PRINTING =====================") 
        println(s"ROW COUNT IS $count") 
        println("================ PRINTING =====================") 
    } 
} 

The thing is that in order to save data to a parquet file format first its necessary to convert the mongoRDD variable to Spark DataFrame. I have tried something like this:

// convert RDD to DataFrame
val myDf = mongoRDD.toDF()  // this lines throws an error
myDF.write.save("my/path/myData.parquet")

and the error I get is this: Exception in thread "main" scala.MatchError: java.lang.Object (of class scala.reflect.internal.Types.$TypeRef$$anon$6)

do you guys have any other idea how could I convert the RDD to a DataFrame so that I can save data in parquet format?

Here's the structure of one Document in the mongoDB collection : https://gist.github.com/kingtrocko/83a94238304c2d654fe4

1 Answer 1

1

Create a Case class representing the data stored in your DBObject.
case class Data(x: Int, s: String)

Then, map the values of your rdd to instances of your case class. val dataRDD = mongoRDD.values.map { obj => Data(obj.get("x"), obj.get("s")) }

Now with your RDD[Data], you can create a DataFrame with the sqlContext

val myDF = sqlContext.createDataFrame(dataRDD)

That should get you going. I can explain more later if needed.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.