1

In Scala Spark efficiently need to replace {0} from Description column to the value available in States column as shown in the output.
It will be more appreciable if you answer this without using spark udf.

Input : Input DF

Output : Output DF

3
  • I have added solution without udf, check once Commented Jun 6, 2020 at 11:53
  • @Srinivas I am having null values in the States column and this code replacing Description to null whenever there is a null value in State column. If its null in the State column it should not change the value in Description, I will update this condition in question. Please suggest how to handle this condition. Commented Jun 9, 2020 at 9:02
  • Update code as per your requirement, Check once upvote if it help. Commented Jun 9, 2020 at 9:15

5 Answers 5

1

Without UDF.

Use def regexp_replace(e: org.apache.spark.sql.Column,pattern: org.apache.spark.sql.Column,replacement: org.apache.spark.sql.Column)

scala>  val df = Seq(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur")).toDF("description","states")
df: org.apache.spark.sql.DataFrame = [description: string, states: string]

scala> df.show(false)
+-----------------------------------------------------+--------------+
|description                                          |states        |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India                |Andhra Pradesh|
|The {0} remains the most beutiful state of India     |Maharashtra   |
|This state {0} often termed as 'Switzerland of India'|Manipur       |
+-----------------------------------------------------+--------------+

pattern - lit("\\{0\\}")

scala> df
.withColumn("description",
            regexp_replace(
                      $"description",
                      lit("\\{0\\}"),
                      $"states"
                   )
           )
.show(false)

+---------------------------------------------------------+--------------+
|Description                                              |states        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
+---------------------------------------------------------+--------------+

Handle null in states column.

scala> df.withColumn("description",when($"states".isNotNull,regexp_replace($"description",lit("\\{0\\}"),$"states")).otherwise($"description")).show(false)
+---------------------------------------------------------+--------------+
|description                                              |states        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
|Sample Data with null                                    |null          |
+---------------------------------------------------------+--------------+

Sign up to request clarification or add additional context in comments.

Comments

0

You can define a UDF by prefixing with udf:

val f = udf((format: String, data: String) => format.replace("{0}", data)

You can replace an existing column value with a new one using df.withColumn

df.withColumn("Description", f (df("Description"), df("States"))

Comments

0

You could follow this approach

input

+-----------------------------------------------------+--------------+
|Description                                          |States        |
+-----------------------------------------------------+--------------+
|{0} is the 4th biggest state of India                |Andhra Pradesh|
|The {0} remains the most beutiful state of India     |Maharashtra   |
|This state {0} often termed as 'Switzerland of India'|Manipur       |
+-----------------------------------------------------+--------------+
object SparkColumnSubstring {

  // build the spark session
  val spark = SparkSession
    .builder()
    .appName("SparkColumnSubstring")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","SparkColumnSubstring") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    import spark.implicits._

    val data = List(("{0} is the 4th biggest state of India","Andhra Pradesh"),("The {0} remains the most beutiful state of India","Maharashtra"),("This state {0} often termed as 'Switzerland of India'","Manipur"))

    try {
      // input dataframe
      val dataDF = sc.parallelize(data).toDF("Description","States")
      dataDF.show(truncate = false)

      // transforming the data
      val dataSub = dataDF.map(r => (r(0).toString.replace("{0}", r(1).toString),r(1).toString)).toDF("Description", "States")

      dataSub.show(truncate = false)

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }

expected output

+---------------------------------------------------------+--------------+
|Description                                              |States        |
+---------------------------------------------------------+--------------+
|Andhra Pradesh is the 4th biggest state of India         |Andhra Pradesh|
|The Maharashtra remains the most beutiful state of India |Maharashtra   |
|This state Manipur often termed as 'Switzerland of India'|Manipur       |
+---------------------------------------------------------+--------------+

Hope this helps.

Comments

0

EDIT 1 : Since you are using a dataframe, you could use:

df.map(each => {
val col1 = each.getString(0)
val col2 = each.getString(1)
val newCol = col1.replace("{0}",col2)
// return the changed value
(newCol, col2)
}).toDF("Description","States").show

If you are having a large dataset, it is better to use an udf

You could also use the RDD approach to map each value to a function. Please see the code below:

scala> val input = List(("{0} is One","1"), ("{0} is Two", "2"))
input: List[(String, String)] = List(({0} is One,1), ({0} is Two,2))

scala> val rdd = sc.parallelize(input)
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:26


scala> rdd.collect().foreach(println)
({0} is One,1)                                                                  
({0} is Two,2)

// Replace each {0} with the value in the 2nd column
scala> rdd.map(each => each._1.replace("{0}",each._2)).collect().foreach(println)
1 is One
2 is Two

Comments

0

Try this-

1. Load the test data

    val df = spark.range(1)
      .withColumn("Description", lit("{0} is the 4th biggest"))
      .withColumn("States", lit("Andhra Pradesh"))

    df.show(false)
    df.printSchema()
    /**
      * +---+----------------------+--------------+
      * |id |Description           |States        |
      * +---+----------------------+--------------+
      * |0  |{0} is the 4th biggest|Andhra Pradesh|
      * +---+----------------------+--------------+
      *
      * root
      * |-- id: long (nullable = false)
      * |-- Description: string (nullable = false)
      * |-- States: string (nullable = false)
      */

2. Without UDF

 df.withColumn("Description", expr("case when States is null then Description else replace(Description, '{0}', States) end"))
      .show(false)

    /**
      * +---+---------------------------------+--------------+
      * |id |Description                      |States        |
      * +---+---------------------------------+--------------+
      * |0  |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
      * +---+---------------------------------+--------------+

3. With UDF

    val replace1 = udf((s: String, replace: String) => java.text.MessageFormat.format(s, replace))
    df.withColumn("Description", replace1($"Description", $"States"))
      .show(false)

    /**
      * +---+---------------------------------+--------------+
      * |id |Description                      |States        |
      * +---+---------------------------------+--------------+
      * |0  |Andhra Pradesh is the 4th biggest|Andhra Pradesh|
      * +---+---------------------------------+--------------+
      */

5 Comments

"It will be more appreciable if you answer this without using spark udf." - did you read this comment? Also why not using built-in function regex_replace then?
@UninformedUser, Thanks for bringing this to my notice. Added without udf version
@SomeshwarKale thanks for the answer, I am having null values in the States column and this code replacing Description to null whenever there is a null value in State column. If its null in the State column it should not change the value in Description, I will update this condition in question. Please suggest how to handle this condition.
Use expr("case when States is null then Description else replace(Description, '{0}', States) end")

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.