0

i am new to apache spark and i was trying to run a test application using spark. The problem which i'm facing is that when i create a RDD using the collection of data i want to process, it gets created but it spark doesn't start processing it unless and until i call the .collect method present in the RDD class. In this way i have to wait for spark to process the RDD. Is there some way that spark automatically processes the collection as soon as i form the RDD and then i can call the .collect method to get the processed data any time without have to wait for spark?

Moreover is there any way i can use spark to put the processed data into a database instead of returning it to me?

The code that i'm using is given below:

object appMain extends App {
    val spark = new SparkContext("local", "SparkTest")

    val list = List(1,2,3,4,5)

    // i want this rdd to be processed as soon as it is created
    val rdd = spark.parallelize(list.toSeq, 1).map{ i =>
    i%2 // checking if the number is even or odd
    }

    // some more functionality here

    // the job above starts when the line below is executed
    val result = rdd.collect

    result.foreach(println)
}
5
  • Spark only computes collections when you run an action such as collect or saveAsTextFile - see the docs. Commented May 29, 2014 at 19:01
  • thanks for the response, ok then is there any way i can use spark to process a collection like sending a job to a worker node and get the result stored into a database automatically without waiting for the result to be processed? Commented May 29, 2014 at 19:12
  • Help us understand your requirements: Why would you need Spark to execute the map before anything uses it? Commented May 29, 2014 at 19:13
  • well i have a scenario in which i have a lot of network packets coming and i have to process them in parallel as soon as they come and store the result into a database. The problem is that i can't wait for a packet to process inside the main application as this would cause me to lose other packets coming. I want to do something in which i send the packet to a worker for processing and it should automatically stores the result into the database without blocking the main thread. Commented May 29, 2014 at 19:37
  • 1
    Looks like you are looking for Spark Streaming instead of 'batch' Spark - spark.apache.org/docs/0.9.0/streaming-programming-guide.html Commented May 29, 2014 at 19:39

2 Answers 2

1

Spark uses a lazy evaluation model in which transformation operations are only applied once an 'action' is called on the RDD. This model fits well a batch operation applied to a large dataset. It's possible to 'cache' some parts of a computation by using rdd.cache(), but that does not force computation, only indicates that once the RDD is available, it should be cached.

Further clarification from the comments indicate that the OP might be better served using a streaming model in which incoming data is processed in 'micro-batches'

This is an example of how an 'urgent event count' streaming job could look like (not tested, for illustrative purposes onle) (based on the Network WordCountExample

object UrgentEventCount {
  def main(args: Array[String]) {

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("UrgentEventCount").setMaster(SPARK_MASTER)

    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val dbConnection = db.connect(dbHost, dbPort)
    val lines = ssc.socketTextStream(ip, port, StorageLevel.MEMORY_AND_DISK_SER)
    //we assume events are tab separated
    val events = lines.flatMap(_.split("\t"))
    val urgentEventCount = events.filter(_.contains("URGENT"))
    dbConnection.execute("Insert into UrgentEvents ...)
    ssc.start()
}

As you can see, if you need to connect to a database, all you need to do is to provide the necessary driver and code to execute the db interaction. Be sure to include the driver's dependencies in the job jar-file.

Sign up to request clarification or add additional context in comments.

Comments

0

maasg has explained the things in details. you can apply count on the RDD after creating it.

rdd.count()

Another way can be to print a few lines of the RDD just like

rdd.take(10).foreach(println)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.