1

I am trying to load data into MongoDB from Spark below is my code:

import com.databricks.spark.avro._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.provider.mongodb._
import MongodbConfig._  

object test  {
    def main(args: Array[String]) {

      val conf = new SparkConf().setAppName("test").setMaster("local[8]")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)  

      val test= sqlContext.read.avro("file:///Users\\abhis\\Desktop\\Avro\\test")    // test File
        logger.info("Reading test file")

      // The avro data is stored in a temporary tables 
      test.registerTempTable("test")    //  test temp table

      val targetData = sqlContext.sql("SELECT * From test")  // my code is running fine till here

      // Configuration of MongoDb to save the data into collection
        val targetOutputBuilder = MongodbConfigBuilder(Map(Host -> List("localhost:27017"), Database -> "subs", Collection -> "target", SamplingRatio -> 1.0, WriteConcern -> MongodbWriteConcern.Normal, SplitKey -> "_id", SplitSize -> 8))

        val writeConfig = targetOutputBuilder.build()

        // Writing data into the mongoDb table
        targetData.saveToMongodb(writeConfig)

    }
  }

I am running the code in Intellij IDEA and added these jar as dependencies:

  • casbah-commons_2.10-2.8.0
  • casbah-core_2.10-2.8.0
  • casbah-core_2.10-2.8.0
  • mongo-java-driver-2.13.0
  • spark-mongodb-core-0.8.7

I am getting below error:

java.lang.NoClassDefFoundError: Could not initialize class com.mongodb.MongoClient
    at com.mongodb.casbah.MongoClient$.apply(MongoClient.scala:176)
    at com.stratio.provider.mongodb.MongodbClientFactory$.createClient(MongodbClientFactory.scala:43)
    at com.stratio.provider.mongodb.writer.MongodbWriter.<init>(MongodbWriter.scala:40)
    at com.stratio.provider.mongodb.writer.MongodbBatchWriter.<init>(MongodbBatchWriter.scala:32)
    at com.stratio.provider.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:61)
    at com.stratio.provider.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:59)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/05/15 22:13:57 ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 18)
java.lang.IllegalAccessError: tried to access method org.bson.types.ObjectId.<init>(III)V from class com.mongodb.Bytes
    at com.mongodb.Bytes.<clinit>(Bytes.java:219)
    at com.mongodb.Mongo.<clinit>(Mongo.java:74)
    at com.mongodb.casbah.MongoClient$.apply(MongoClient.scala:176)
    at com.stratio.provider.mongodb.MongodbClientFactory$.createClient(MongodbClientFactory.scala:43)
    at com.stratio.provider.mongodb.writer.MongodbWriter.<init>(MongodbWriter.scala:40)
    at com.stratio.provider.mongodb.writer.MongodbBatchWriter.<init>(MongodbBatchWriter.scala:32)
    at com.stratio.provider.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:61)
    at com.stratio.provider.mongodb.MongodbDataFrame$$anonfun$saveToMongodb$1.apply(mongodbFunctions.scala:59)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

1 Answer 1

2

Well, I tried your code with slight modifications, it works. I did not test your exact setup (dependency versions), that is way off from my setup to test. If you can change your versions, this might help.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import com.stratio.datasource._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.schema._
import com.stratio.datasource.mongodb.writer._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import com.stratio.datasource.mongodb.config.MongodbConfigBuilder

/**
 * Using https://github.com/Stratio/Spark-MongoDB
 */
object ToMongoDB {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ToMongoDB")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val avroInput = sqlContext.read.format("com.databricks.spark.avro").load("/spark_learning/avro/")


    val targetOutputBuilder = MongodbConfigBuilder(
      Map(Host -> List("localhost:27017"),
        Database -> "test",
        Collection -> "target",
        SamplingRatio -> 1.0,
        WriteConcern -> "normal", //changed from MongodbWriteConcern.Normal to "normal"
        SplitKey -> "_id",
        SplitSize -> 8))

    val writeConfig =  targetOutputBuilder.build()
    avroInput.saveToMongodb(writeConfig)
  }

}

I added these dependecnies in the buid.sbt file

val spark_mongodb = "com.stratio" % "spark-mongodb" % "0.8.0"
val spark_mongodb_stratio = "com.stratio.datasource" % "spark-mongodb_2.10" % "0.11.1"
val spark_avro = "com.databricks" %% "spark-avro" % "1.0.0"
libraryDependencies += spark_mongodb,
libraryDependencies += spark_mongodb_stratio,
libraryDependencies += spark_avro
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.