0

I have one hive table. I want to create dynamic spark SQL queries.at the time of spark submit, i am specifying rulename. based on the rule name query should generate. At the time of spark submit, I have to specify rule name. For example:

sparks-submit  <RuleName> IncorrectAge 

It should fire my scala object code:

select tablename, filter, condition from all_rules where rulename="IncorrectAge"

My table: Rules(Input table)

|---------------------------------------------------------------------------|
| rowkey|  rule_name|rule_run_status| tablename     |condition|filter |level|
|--------------------------------------------------------------------------|
| 1    |IncorrectAge| In_Progress  | VDP_Vendor_List| age>18 gender=Male|NA|
|---------------------------------------------------------------------------
|2    | Customer_age| In_Progress  | Customer_List | age<25 gender=Female|NA| 
|----------------------------------------------------------------------------

I fetch the rulename:

 select tablename, filter, condition from all_rules where rulename="IncorrectAge";

After executing this query, I got the result like this:

   |----------------------------------------------|
   |tablename        | filter         | condition |
   |----------------------------------------------|
   |VDP_Vendor_List  | gender=Male     | age>18   |
   |----------------------------------------------|

Now I want make spark sql query dynamically

select count(*) from VDP_Vendor_List  // first column --tablename     
       select count(*) from VDP_Vendor_List where gender=Male  --tablename and filter
        select * from EMP where gender=Male  AND  age >18       --tablename, filter, condition

My Code -Spark 2.2 version code :

         import org.apache.spark.sql.{ Row, SparkSession }
         import org.apache.log4j._

object allrules {
  def main(args: Array[String]) {      
    val spark = SparkSession.builder().master("local[*]")
      .appName("Spark Hive")
      .enableHiveSupport().getOrCreate();

    import spark.implicits._
    val sampleDF = spark.read.json("C:/software/sampletableCopy.json") // for testing purpose i converted  hive table to json data
 sampleDF.registerTempTable("sampletable")
 val allrulesDF = spark.sql("SELECT * FROM sampletable")

  allrulesDF.show()
  val TotalCount: Long = allrulesDF.count()
  println("==============>  Total count ======>" + allrulesDF.count())

  val df1 =  allrulesDF.select(allrulesDF.col("tablename"),allrulesDF.col("condition"),allrulesDF.col("filter"),allrulesDF.col("rule_name"))
 df1.show()
 val df2=   df1.where(df1.col("rule_name").equalTo("IncorrectAge")).show()           
    println(df2)



//                             var table_name = ""
//                             var condition =""
   //                              var filter = "";
  //              df1.foreach(row=>{    
  //                                   table_name = row.get(1).toString();
  //                                   condition = row.get(2).toString();
  //                                   filter = row.get(3).toString();                             
  //                              })

   }
 }
4
  • 4
    I don't think it's totally clear what your question is. Could you add a few lines at the end which indicate exactly what you are looking for? Commented Oct 8, 2018 at 18:36
  • select tablename, filter, condition from all_rules where rulename="IncorrectAge"; here IncorrectAge is my rule name. i am taking 3 attributes( tablename, filter, condition).for first query i am using only one attribute.Eg- select count() from VDP_Vendor_List // first column --tablename . in second query i am using 2 attributes. eg- select count() from VDP_Vendor_List where gender=Male --tablename and filter. Commented Oct 8, 2018 at 18:44
  • 1
    Welcome to Stack Overflow! Other users marked your question for low quality and need for improvement. I re-worded/formatted your input to make it easier to read/understand. Please review my changes to ensure they reflect your intentions. But I think your question is still not answerable. You should edit your question now, to add missing details (see minimal reproducible example ). Feel free to drop me a comment in case you have further questions or feedback for me. Commented Oct 9, 2018 at 3:59
  • Please: A) never put more infos into comment, they should go into the question. B) try to come up with a minimal question that only contains the information that is relevant to your problem. Commented Oct 9, 2018 at 4:00

2 Answers 2

0

You can pass arguments from spark-submit to your application:

bin/spark-submit --class allrules something.jar tablename filter condition

then, in your main function you will have your params:

def main(args: Array[String]) : Unit = {

   // args(0), args(1) ... there are your params

}
Sign up to request clarification or add additional context in comments.

Comments

0

You can pass your argument to your driver class like this :

    object DriverClass
    {
       val log = Logger.getLogger(getClass.getName)
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder().master("yarn").config("spark.sql.warehouse.dir", "path").enableHiveSupport().getOrCreate()
          if (args == null || args.isEmpty || args.length != 2) {
                log.error("Invalid number of arguments passed.")
                log.error("Arguments Usage: <Rule Name> <Rule Type>)
                log.error("Stopping the flow")
                System.exit(1)
            }
         import spark.implicits._
         val ruleName: String = String.valueOf(args(0).trim())
         val ruleType: String = String.valueOf(args(1).trim())
         val processSQL: String="Select tablename, filter, condition from all_rules where $ruleName=$ruleType"
         val metadataDF=spark.sql(processSQL)
         val (tblnm,fltr,cndtn) =metadataDF.rdd.map(f=>(f.get(0).toString(),f.get(1).toString(),f.get(2).toString())).collect()(0)
    val finalSql_1="select count(*) from $tblnm"  // first column    
    val finalSql_2="select count(*) from $tblnm" where $fltr"
    val finalSql_3="select * from EMP where $fltr  AND  $cndtn"
    spark.sql(finalSql_1).show()
    spark.sql(finalSql_2).show()
    spark.sql(finalSql_3).show()
    }
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.