0

I have a dataframe with 3 columns - number (Integer), Name (String), Color (String). Below is the result of df.show with repartition option.

val df = sparkSession.read.format("csv").option("header", "true").option("inferschema", "true").option("delimiter", ",").option("decoding", "utf8").load(fileName).repartition(5).toDF()

+------+------+------+
|Number|  Name| Color|
+------+------+------+
|     4|Orange|Orange|
|     3| Apple| Green|
|     1| Apple|   Red|
|     2|Banana|Yellow|
|     5| Apple|   Red|
+------+------+------+

My objective is to create list of strings corresponding to each row by replacing the tokens in common dynamic string which I am passing as parameter to the method with the column values For example: commonDynamicString = Column.Name with Column.Color color

In this string, my tokens are Column.Name and Column.Color. I need to replace these values for all the rows with respective values in that column. Note: this string can change dynamically hence hardcoding won’t work.

I don't want to use RDD unless no other option is available with dataframe.

Below are the approaches I tried but couldn't achieve my objective.

Option 1:

val a = df.foreach(t => {
 finalValue = commonString.replace("Column.Number", t.getAs[Any]("Number").toString())
          .replace("DF.Name", t.getAs("Name"))
          .replace("DF.Color", t.getAs("Color"))

          println ("finalValue: " +finalValue)
          })

With this approach, the finalValue prints as expected. However, I cannot create a listbuffer or pass the final string from here as a list to other function as foreach returns Unit and spark throws error.

Option 2: I am thinking about this option but would need some guidance to understand if foldleft or window or any other spark functions can be used to create a 4th column called "Final" using withColumn option and use a UDF where I can extract all the tokens using regex pattern matching - "Column.\w+" and do replace operation for the tokens?

+------+------+------+--------------------------+
|Number|  Name| Color|      Final               |
+------+------+------+--------------------------+
|     4|Orange|Orange|Orange with orange color  |
|     3| Apple| Green|Apple with Green color    |
|     1| Apple|   Red|Apple with Red color      |
|     2|Banana|Yellow|Banana with Yellow color  |
|     5| Apple|   Red|Apple with Red color      |
+------+------+------+--------------------------+

Can someone help me with this problem and also to let me know if I am thinking in the right direction to use spark for handling large datasets?

Thanks!

1 Answer 1

1

If I understand your requirement correctly, you can create a column method, say, parseStatement which takes a String-type statement and returns a Column with the following steps:

  1. Parse the input statement to count number of tokens
  2. Generate a Regex pattern in the form of ^(.*?)(token1)(.*?)(token2) ... (.*?)$
  3. Apply pattern matching to assemble a colList consisting of lit(g1), col(g2), lit(g3), col(g4), ..., where the g?s are the extracted Regex groups
  4. Concatenate the Column-type items

Here's the sample code:

import spark.implicits._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def parseStatement(stmt: String): Column = {
  val token = "Column."
  val tokenPattern = """Column\.(\w+)"""
  val literalPattern = "(.*?)"
  val colCount = stmt.sliding(token.length).count(_ == token)

  val pattern = (0 to colCount * 2).map{
    case i if (i % 2 == 0) => literalPattern
    case _ => tokenPattern
  }.mkString

  val colList = ("^" + pattern + "$").r.findAllIn(stmt).
    matchData.toList.flatMap(_.subgroups).
    zipWithIndex.map{
      case (g, i) if (i % 2 == 0) => lit(g)
      case (g, i) => col(g)
  }

  concat(colList: _*)
}

val df = Seq(
  (4, "Orange", "Orange"),
  (3, "Apple", "Green"),
  (1, "Apple", "Red"),
  (2, "Banana", "Yellow"),
  (5, "Apple", "Red")
).toDF("Number", "Name", "Color")

val statement = "Column.Name with Column.Color color"

df.withColumn("Final", parseStatement(statement)).
  show(false)
// +------+------+------+------------------------+
// |Number|Name  |Color |Final                   |
// +------+------+------+------------------------+
// |4     |Orange|Orange|Orange with Orange color|
// |3     |Apple |Green |Apple with Green color  |
// |1     |Apple |Red   |Apple with Red color    |
// |2     |Banana|Yellow|Banana with Yellow color|
// |5     |Apple |Red   |Apple with Red color    |
// +------+------+------+------------------------+

Note that concat takes column-type parameters, hence the need of col() for column values and lit() for literals.

Sign up to request clarification or add additional context in comments.

10 Comments

Hi Leo, thanks for the suggestion but in my case the string will be changing every time so I cannot hardcode any values. Can you provide me the generalized expression for both column names and lit values ? Also consider it to be like some executable statement, so what’s the best way to use spark to get all the values in Final column and execute each of these statements ?
Let’s say u have a select statement that needs to be built dynamically select * from table name where name = column.name and color = column.color. Another iteration can be an update statement where my query structure would change but I still need to replace some tokens with respective value. For ex: if I see column.name in the query I need to replace column.name only with name column value. If it doesn’t have column.color I won’t derive color column. I could use the regex defined in my original query to get the values against tokens and replace. I am struggling with how to part in dataframe
I've updated the solution based on my understanding of your clarified requirement.
Thanks Leo for your guidance. I think above approach gives me some idea on how to create a new dataframe with new column using a function. I will test against some scenarios before marking the answer as correct. I have one more question - If the Final column has list of 1 Million queries instead of just simple statements and I want to execute them against database, what is the optimized way of using dataframe to execute each of them? This can be a different SO topic, however i was thinking of creating list of string from dataframe and using df.count to iterate over each query and execute.
Using this SQL statement select Column.id, Column.name from product where Column.id > 100; as an example, number of tokens is 3 hence the generated Regex pattern will consist of interleaving Regex groups of 3 tokenPatterns and 4 literalPatterns. The extracted Regex groups will be: (select )(id)(, )(name)( from product where )(id)( > 100;)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.