2

I have working with Spark 3.1.2 with Scala 2.12. I want to parallelize some keys among nodes and they read its data based on the key which receive. So, I must work with RDD first and then convert it to Spark DataFrame. I read data from the table in Oracle Database. The code is in the following:

   object managementData extends App {
     val num_node = 2
    def read_data(group_id: Int):String =  {
       val table_name = "table"
       val col_name = "col"
       val query =
         """ select f1,f2,f3,f4,f5,f6,f7,f8
            | from """.stripMargin + table_name + """ where MOD(TO_NUMBER(substr("""+col_name+""", -LEAST(2, LENGTH("""+col_name+""")))),"""+num_node+""")="""+group_id
      val oracleUser = "ORCL"
      val oraclePassword = "XXXXXXXX"
      val oracleURL = "jdbc:oracle:thin:@//X.X.X.X:1521/ORCLDB"
      val ods = new OracleDataSource()
       ods.setUser(oracleUser)
       ods.setURL(oracleURL)
       ods.setPassword(oraclePassword)
      val con = ods.getConnection()
      val statement = con.createStatement()
      statement.setFetchSize(1000)      // important
      val  resultSet : java.sql.ResultSet = statement.executeQuery(query) //
      val ret = Iterator.continually(resultSet)
         .takeWhile(_.next)
         .flatMap(r => (1 until 8).map(i => r.getString(i)))
         .mkString(" ")
      return ret
    }
 def udf(rdd: RDD[String]): DataFrame = {
      val spark = SparkSession
         .builder.getOrCreate()
      val schema = new StructType()
         .add(StructField("f1", StringType, true))
         .add(StructField("f2", StringType, true))
         .add(StructField("f3", StringType, true))
         .add(StructField("f4", StringType, true))
         .add(StructField("f5", StringType, true))
         .add(StructField("f6", StringType, true))
         .add(StructField("f7", IntegerType, true))
         .add(StructField("f8", IntegerType, true))
      val df = spark.createDataFrame(rdd, schema)
      return df
      }
    def main():Unit={
        val group_list = Seq.range(1,2,1) // making a list
        val conf = new SparkConf()
           .setMaster("local[2]")
           .setAppName("testScala")
           .set("spark.executor.memory", "8g")
           .set("spark.executor.cores", "2")
           .set("spark.task.cpus","1")
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(group_list,num_node)
           .map(read_data)
        rdd.map(x => println(x)).collect()
        udf(rdd)
     }
     main()
    }

read_data works perfectly. But, I cannot convert RDD to Spark DataFrame and receive this error:

 overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: 
  Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema:  org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
   cannot be applied to (org.apache.spark.rdd.RDD[String], org.apache.spark.sql.types.StructType)
val df = spark.createDataFrame(rdd, schema)

Would you please guide me what is wrong about createDataFrame method to convert RDD to Spark Dataframe?

Any help is really appreciated.

1
  • Not sure I am following it all. Commented Sep 29, 2021 at 18:43

1 Answer 1

1

Need this type of approach:

...
val schema = new StructType()
  .add(StructField("f", StringType, true))
  .add(StructField("m", StringType, true))
  .add(StructField("l", StringType, true))
  .add(StructField("d", StringType, true))
  .add(StructField("g", StringType, true))
  .add(StructField("v", IntegerType, true))

val df = spark.createDataFrame(rdd, schema)
Sign up to request clarification or add additional context in comments.

4 Comments

Dear @thebluephantom, thank you for your answer. Would you please guide what type I can use for spark variable? It does not let me define val spark = SparkSession.builder.getorcreate(). I receive this error: overloaded method value createDataFrame with alternatives.
Collect not needed
Does not look correct your code in the question.
I update my question and show the complete code. Would you please take a loot at it? Many thanks.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.