1

I have several Spark Dataframes(we can call them Table a, table b etc). I want to add a column just to table a, based on a result of a query to one of the other tables, but this table will change every time based on a value of one of the fields of table a. So this query should be parametric. Below I show an example to make the problem clear:

Every table has the column OID and a column TableName with the name of the current table, plus other columns.

    This is the fixed query to be performed on Tab A to add new column:

    Select $ColumnName from $TableName where OID=$oids

    Tab A
    |   oids|TableName  |ColumnName | other fields|New Column: ValueOidDb
    ================================================================
    |    2  |  Book      | Title     |      x      |result query:harry potter
    |    8  |  Book      | Isbn      |      y      |result query: 556 
    |    1  |  Author    | Name      |      z      |result query:Tolkien
    |    4  |  Category  |Description|      b      |result query: Commedy


    Tab Book
    |   OID |TableName   |Title       |Isbn  |other fields|
    ================================================================
    |    2  |  Book      |harry potter| 123  | x          |
    |    8  |  Book      | hobbit     | 556  | y          | 
    |    21 |  Book      | etc        | 8942 | z          |
    |    5  |  Book      | etc2       | 984  | b          |

   Tab Author
    |   OID |TableName     |Name        |nationality |other fields|
    ================================================================
    |    5  |  Author      |J.Rowling   | eng        | x          |
    |    2  |  Author      |Geor. Martin| us         | y          | 
    |    1  |  Author      | Tolkien    | eng        | z          |
    |    13 |  Author      | Dan Brown  | us         | b          |


   |   OID | TableName    |Description |
   =====================================
   |    12 |  Category    | Fantasy    | 
   |    4  |  Category    | Commedy    |  
   |    9  |  Category    | Thriller   | 
   |    7  |  Category    | Action     |  

I tried with this udf

    def setValueOid = (oid: Int,TableName: String, TableColumn: String) => {

    try{
      sqlContext.sql(s"Select $currTableColumn from $currTableName where OID = $curroid ").first().toString()
       }
  catch{
      case x: java.lang.NullPointerException =>  "error"  
       }

      }
   sqlContext.udf.register("setValueOid", setValueOid)

   val FinalRtxf =  sqlContext.sql("SELECT all the column of TAB A ,"
                 + " setValueOid(oid, Table,AttributeDatabaseColumn ) as     ValueOidDb"
                 + " FROM TAB A")

I put the code in a try catch because otherwise it gives me a nullpointerexception, but it doesn't work, because it always returns a "problem". If I try this function without a sql query by just passing some manual parameters it works perfectly:

          val try=setValueOid(8,"BOOK","ISBN")
           try: String = [0977326403 ]                    FINISHED   
          Took 4 sec. Last updated by anonymous at November 20 2016, 3:29:28 AM.

I read here that is not possible to make a query inside a udf Trying to execute a spark sql query from a UDF

So how can I solve my problem? I don't know how to make a parametric join. I tried this:

       %sql
         Select  all attributes TAB A,    
         FROM TAB A  as a
         join (Select $AttributeDatabaseColumn ,TableName  from $Table where  OID=$oid) as b
         on a.Table=b.TableName 

but it gave me this exception:

  org.apache.spark.sql.AnalysisException: cannot recognize input near  '$'   'AttributeDatabaseColumn' ',' in select clause; line 3 pos 1       at   org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:318)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)

1 Answer 1

0

One option:

  • transform each Book, Author, Category to a form:

    root
     |-- oid: integer (nullable = false)
     |-- tableName: string (nullable = true)
     |-- properties: map (nullable = true)
     |    |-- key: string
     |    |-- value: string (valueContainsNull = true)
    

    For example first record in Book:

    val book = Seq((2L, "Book", 
      Map("title" -> "harry potter", "Isbn" -> "123", "other field" -> "x")
    )).toDF("oid", "title", "properties")
    
    +---+---------+---------------------------------------------------------+
    |oid|tableName|properties                                               |
    +---+---------+---------------------------------------------------------+
    |2  |Book     |Map(title -> harry potter, Isbn -> 123, other field -> x)|
    +---+---------+---------------------------------------------------------+
    
  • union Book, Author, Category as properties.

    val properties = book.union(author).union(category)
    
  • join with base table:

    val comb = properties.join(table, Seq($"oid", $"tableName"))
    
  • use case when ... based on tableName to add new column from properties field.

Sign up to request clarification or add additional context in comments.

2 Comments

i'm new in spark. how can i transform each dataframe(book, author etc) in that form? Also in this application the dataframes are book, author etc, but my program will run in different applications and the dataframes could change in time(just TABLE A will always remain). i would like to have an general approach, not based on this specific example, because i don't know at priori what will be Book, Author etc.is this possible?Thanks
also those table book author etc have thousand of rows, i can't do this map manually.it will be insane

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.