0

I am using Spark JDBC to read data from MS SQL database, but I am getting some weird results.

For example, below is my code to read records from my MS SQL database. Please note the tables from which I am reading data is continuously getting inserted with records.

 //Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")

    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")

I get 2 different count values each time I run my program, for example I always get ${updateJdbcDF.count()} count > ${jdbcTable.count()}.

Can somebody explain me why is this happening? This is creating a lot of issues in my use case. How to restrict count of jdbcTable DataFrame after it is created. I tried jdbcTable.cache() but no luck.

The records just gets bigger and bigger when I use any operation on other dataframe derived from jdbcTable DataFrame. Does jdbcTable dataframe call every time I use any dataframe derived from jdbcTable dataframe.

8
  • Is the difference constant? Or are you getting different counts for both statements every time? Commented Sep 21, 2017 at 8:39
  • @philantrovert No difference is not constant, I am getting different count each time.. Commented Sep 21, 2017 at 8:47
  • 1
    Well if "the tables from which I am reading data is continuously getting inserted with records" and your request does not define a fixed range predicate, then the number of rows in the table is different each time sparks accesses it. So what you are seeing (changing counts) is only to be expected, isn't it ? Commented Sep 21, 2017 at 9:56
  • @GPI, OK, that means If my predicate is absent, then whenever I use any dataframe derived from the jdbcTable dataframe then a spark will read again from the DB and will always give me higher count right?? Actually, this is what I am observing from my code as well jdbcTable is the first DF and when I add some columns and make another DF such as updateJdbcDF ; count get increased as spark has re-read the table. Commented Sep 21, 2017 at 10:22
  • 1
    @nilesh1212 this is a "grey" area. Spark will "do its best" not to recompute any RDD / dataframe that it can avoid (and even allows you to cache them). But it will go back to the datastores (be them Databases through JDBC, Parquet files on HDFS, ...) if it has to recompute part of its execution graph that has not been cached. Which kind of clashes with some basic assumptions one can make : RDDs (and by extension dataframes) are supposed to be immutable, but if they're backed by dynamic data, they can not be, and one needs to code accordingly to the nature of their source. Commented Sep 21, 2017 at 12:40

1 Answer 1

1

I was able to fix this issue by applying jdbcTable.cache(), Now any DF derived from jdbcTable dataframe does not give me a higher count than jdbcTable.count(). All calculations are OK now. Thanks for the explanation @GPI

//Extract Data from JDBC source
    val jdbcTable = sqlContext.read.format("jdbc").options(
      Map(
        "url" -> jdcbUrl,
        "driver" -> "net.sourceforge.jtds.jdbc.Driver",
        "dbtable" ->
          s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t"))
       .load

    jdbcTable.cache()

     println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}")


    val updateJdbcDF = jdbcTable
      .withColumn("ID-COL1", trim($"COl1"))
      .withColumn("ID-COL1", trim($"COl2"))

   println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}")
   /**
     * MORE DATA PROCESSING HERE
   /**

  jdbcTable.unpersist()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.