1

I have two files with following structure

File 1

gnk_id, matchId, timestamp

File 2

gnk_matchid, matchid

I want to update value of gnk_id in file 1 with value of matchid in file 2 if file1.gnk_id = file2.gnk_machid.

For this I created two data frame in Spark. I was wondering whether we can update values in Spark? If not, is there any workaround for this which will provide updated final file?

UPDATE

I did something like this

case class GnkMatchId(gnk: String, gnk_matchid: String)
case class MatchGroup(gnkid: String, matchid: String, ts: String)

val gnkmatchidRDD = sc.textFile("000000000001").map(_.split(',')).map(x => (x(0),x(1)) )

val gnkmatchidDF = gnkmatchidRDD.map( x => GnkMatchId(x._1,x._2) ).toDF()

val matchGroupMr = sc.textFile("part-00000").map(_.split(',')).map(x => (x(0),x(1),x(2)) ).map( f => MatchGroup(f._1,f._2,f._3.toString) ).toDF()

val matchgrp_joinDF = matchGroupMr.join(gnkmatchidDF,matchGroupMr("gnkid") === gnkmatchidDF("gnk_matchid"),"left_outer")

matchgrp_joinDF.map(x => if(x.getAs[String]("gnk_matchid").length != 0 ) {MatchGroup(x.getAs[String]("gnk_matchid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))} else {MatchGroup(x.getAs[String]("gnkid"), x.getAs[String]("matchid"),x.getAs[String]("ts"))}).toDF().show()

But at last step it's failing for NULLpointerEXception

1

4 Answers 4

4

DataFrame is based on RDD, so you can't update values in it.

But you could do withColumn for updating values by adding new column.

In your case, you could do by join and withColumn by using a UDF:

// df1: your File1
// +------+-------+---+
// |gnk_id|matchId| ts|
// +------+-------+---+
// |     1|     10|100|
// |     2|     20|200|
// +------+-------+---+

// df2: your File2
// +-----------+-------+
// |gnk_matchid|matchid|
// +-----------+-------+
// |          1|   1000|
// |          3|   3000|
// +-----------+-------+

// UDF: choose values from matchid or gnk_id for the new column
val myUDF = udf[Integer,Integer,Integer]((df2_matchid: Integer, df1_gnk_id: Integer) => {
  if (df2_matchid == null) df1_gnk_id
  else df2_matchid
})

df1.join(df2, $"gnk_id"===$"gnk_matchid", "left_outer")
  .select($"df1.*", $"df2.matchid" as "matchid2")
  .withColumn("gnk_id", myUDF($"matchid2", $"gnk_id"))
  .drop($"matchid2")
  .show()

Here's the output:

+------+-------+---+
|gnk_id|matchId| ts|
+------+-------+---+
|  1000|     10|100|
|     2|     20|200|
+------+-------+---+
Sign up to request clarification or add additional context in comments.

Comments

2

It's probably a join you are looking for. Assuming you have your dataframes in file1 and file2, you can try the following:

val result = file1
  .join(file2, file1("matchId") === file2("matchid"))
  .select(
    col("gnk_matchid").as("gnk_id"),
    col("matchId"),
    col("timestamp")
  )

Comments

0

Its depend on the datasource you're using support that or not.

  • with Hive you can.
  • with file you just need to filter out the lines, make changes and add it back.

Hope it can helps.

2018/04/11 Updated link.

3 Comments

Your first link is dead, and also I question whether you can do updates using Spark SQL on a Hive table, because there is an open ticket to support updates for Hive (transactional) tables: issues.apache.org/jira/browse/SPARK-15348
From this book. Hive solution is just to concatenate the files it does not alter or change records. it’s possible to update data in Hive using ORC format With transactional tables in Hive together with insert, update, delete, it does the "concatenate " for you automatically in regularly intervals. Currently this works only with tables in orc.format (stored as orc) Alternatively, use Hbase with Phoenix as the SQL layer on top Hive was originally not designed for updates, because it was.purely warehouse focused, the most recent one can do updates, deletes etc in a transactional way.
It is indeed possible to use Hive to do updates on a Hive transactional table in ORC format. However, Spark SQL does not support that: it's not currently possible to do updates using Spark SQL on a Hive transactional table, as documented in the ticket I mentioned issues.apache.org/jira/browse/SPARK-15348
0

Simplest way to achieve , below code reads dimension data folder for every batch but do keep in mind new dimension data values (country names in my case) have to be a new file.

Below solution for stream + batch join

package com.databroccoli.streaming.dimensionupateinstreaming

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}

object RefreshDimensionInStreaming {

  def main(args: Array[String]) = {

    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)

    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()

    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))

    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))

    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")

    var countryDf: Option[DataFrame] = None: Option[DataFrame]

    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")

      if (countryDf != None) {
        countryDf.get.unpersist()
      }

      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))

      countryDf.get.show()
    }

    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)

        updateDimensionDf()

        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show

      }
      .start()
      .awaitTermination()

  }

}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.