6

I am working on a project with spark and scala and I am new to both but with lot of help from stackoverflow I have done all the data processing and stored the processed data in mysql. Now at last I am facing a problem and I don't understand how to tackle it. First time when I processed the data then I stored the dataframe using this method and first time table is empty.

      df.write.mode("append").jdbc("dburl", "tablename", "dbproperties"); 

Let say that my processed data is look like this in database.

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     24                   2016-04-27 00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00

Now person named 'Root' with eid 'C111' visit office 2 times on '2016-08-30 00:00:00' now after processing this new data I need to update only this person record in database. How I will do that. Now the updated table should look like this.

      id      name       eid      number_of_visitis    last_visit_date
      1       John       C110     12                   2016-01-13 00:00:00
      2       Root       C111     26                   2016-08-30  00:00:00
      3       Michel     C112     8                    2016-07-123 00:00:00
      4       Jonny      C113     45                   2016-06-10 00:00:00

I have million of data in this table and if I load the full table in spark dataframe and update the desired record then it will take more time and also it does not make sense because why I load the full table when I want to update only one row.I tried this code but it added the new row to table rather than updating the row.

       df.write.mode("append").jdbc("dburl", "tablename", "dbproperties");

Is there any way to do that in spark?

I have seen this on Internet can I do like this for update.

val numParallelInserts = 10
val batchSize = 1000

new CoalescedRDD(sessions, numParallelInserts) mapPartitionsWithSplit { (split, iter) => Iterator((split, iter)) } foreach { case (split, iter) =>
  val db = connect()

  val sql = "INSERT INTO sessions (id, ts) VALUES (?, ?)"
  val stmt = db.prepareStatement(sql)

  iter.grouped(batchSize).zipWithIndex foreach { case (batch, batchIndex) =>
    batch foreach { session =>
      stmt.setString(1, session.id)
      stmt.setString(2, TimestampFormat.print(session.ts))
      stmt.addBatch()
    }
    stmt.executeBatch()
    db.commit();
    logInfo("Split " + (split+1) + "/" + numParallelInserts + " inserted batch " + batchIndex + " with " + batch.size + " elements")
  }

  db.close();
2
  • have you tried with "overwrite" mode? Commented Sep 1, 2016 at 19:46
  • 2
    overwrite recreate the table with not exact data types and delete all the older data and insert only new processed data. Commented Sep 2, 2016 at 4:47

1 Answer 1

3

You can try using sql to do that. Store the updated (and even new) data in a new temporary table and then merge the temporary table into the main table.

One way to do that is -

  1. Update all the records in the main table using the temporary table

    update main_table set visits = main_table.visits + temp_table.visits from temp_table where main_table.eid = temp_table.eid;

  2. Delete all duplicate records from temporary table (that leaves only new records in the temporary table)

    delete from temp_table where main_table.eid = temp_table.eid;

  3. Insert all records from temporary table into main table

    insert into main_table select * from temp_table;

  4. Drop the temporary table

    drop table temp_table;

Sign up to request clarification or add additional context in comments.

3 Comments

this is at db level and have more steps I want to do it in shortest possible way. I have millions of data so copying,removing and inserting will take time. I update the question please look at that may you understand what i want to do.
I am using this on a redshift database with literally billions of rows.
Any update on this as to how one should proceed. I am facing similar situation and I am new to spark and db2.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.