3

I have a data frame in below format:

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |           10800 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            7000 | 

I need to convert this data frame to below, basically, I need to create an entry per hour so if WatchTime(sec) is more than 3600 seconds I need to create a new entry for the next hour

|u_name|Date        |Hour |  Content_id|WatchTime(sec)   |
|user1 | 2019-07-28 |  21 |        100 |            3600 |
|user1 | 2019-07-28 |  22 |        100 |            3600 |
|user1 | 2019-07-28 |  23 |        100 |            3600 |
|user2 | 2019-07-28 |  20 |        101 |            3600 | 
|user3 | 2019-07-28 |  21 |        202 |            3600 | 
|user3 | 2019-07-28 |  22 |        202 |            3400 |

This can be achieved someway using SQL but I am using Scala and what is the efficient way to achieve this.

3 Answers 3

5

You can achieve this in spark 2.4+ with the following transformations:

  • Split the WatchTime in an array of 3600 seconds with sequence higher-order function
  • Explode the array to generate the new rows
  • Adjust the Hour and WatchTime values for each rows
  • Remove all rows with a zero WatchTime
val result = df
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))
   .withColumn("offset", explode('stamps))
   .withColumn("Hour", 'Hour + ('offset/3600).cast("int"))
   .withColumn("WatchTime", 'WatchTime - 'offset)
   .withColumn("WatchTime", when('WatchTime <= 3600, 'WatchTime).otherwise(3600))
   .filter('WatchTime > 0)
   .drop("stamps","offset")

result.show()
+------+-------------------+----+----------+---------+
|u_name|               Date|Hour|Content_id|WatchTime|
+------+-------------------+----+----------+---------+
| user1|2019-07-28 00:00:00|  21|       100|     3600|
| user1|2019-07-28 00:00:00|  22|       100|     3600|
| user1|2019-07-28 00:00:00|  23|       100|     3600|
| user2|2019-07-28 00:00:00|  20|       101|     3600|
| user3|2019-07-28 00:00:00|  21|       202|     3600|
| user3|2019-07-28 00:00:00|  22|       202|     3400|
+------+-------------------+----+----------+---------+

This algorithm may generate hours higher than 23. If you need accurate Date and Hour information, I'd advice you to use single unix timestamp column combining the start date and hour since it will let you do time manipulation and proper conversion to date and hour when needed.

It would look like this:

val result = df
   .withColumn("StartDateTime", unix_timestamp('Date) + ('Hour * 3600 ))
   .withColumn("stamps", sequence(lit(0), 'WatchTime, lit(3600)))  
   .withColumn("offset", explode('stamps))
   .withColumn("StartDateTime", from_unixtime('StartDateTime + 'offset))
   .withColumn("WatchTime", when('WatchTime - 'offset>3600,3600).otherwise('WatchTime - 'offset))
   .filter('WatchTime > 0)
   .select('u_name, 'content_id, 'StartDateTime, 'WatchTime)

result.show
+------+----------+-------------------+---------+
|u_name|content_id|      StartDateTime|WatchTime|
+------+----------+-------------------+---------+
| user1|       100|2019-07-28 21:00:00|     3600|
| user1|       100|2019-07-28 22:00:00|     3600|
| user1|       100|2019-07-28 23:00:00|     3600|
| user2|       101|2019-07-28 20:00:00|     3600|
| user3|       202|2019-07-28 21:00:00|     3600|
| user3|       202|2019-07-28 22:00:00|     3400|
+------+----------+-------------------+---------+
Sign up to request clarification or add additional context in comments.

3 Comments

This works, Thank you so much :) can you update how do we do this for datetime column so that it can handle date changes aswell.
any alternative for sequence in spark 2.3
in spark 2.3, you'll need to implement a sequence UDF or to directly flatMap your input DataFrame and implement the logic in scala
0

I would do something like that :

// Get max for loop
val max = df
  .agg(max(floor($"WatchTime(sec)" / 3600)).as("max"))
  .select($"max")
  .first
  .getInt(0)

// Union all
val newDf = (0 to max)
  .map { i =>
    df.filter($"WatchTime(sec)" > i * 3600)
      .withColumn("Hour", $"Hour" + i)
      .withColumn(
        "WatchTime(sec)",
        when($"WatchTime(sec)" - i * 3600 > 3600, 3600)
          .otherwise($"WatchTime(sec)" - i * 3600)
      )
  }
  .reduceLeft { (df1, df2) =>
    df1.union(df2)
  }
  .orderBy($"u_name", $"Date", $"Hour")


It's just a sugestion on how to do it. There surely is some more efficient ways.

Comments

0

We can achieve the outcome purely on Dataset like following,

//Define a case class
case class UserStat(uname: String, date: java.util.Date, hour: Int, contentId: String, watchTimeInSec: Int)

//Define a variable offset
 val offset: Int = 3600

userStatDs.flatMap(userStat => {
  val remainingWatchTimeInSec = userStat.watchTimeInSec % offset
  val remainingWatchTimeInSecCount = if (remainingWatchTimeInSec == 0) 0 else 1
  val totalIterationCount = (userStat.watchTimeInSec / offset) + remainingWatchTimeInSecCount

  if (userStat.watchTimeInSec <= offset) List(userStat)
  else {
    (0 until totalIterationCount)
      .map(index => {
        if ((userStat.watchTimeInSec / offset) == index)
          userStat.copy(hour = userStat.hour + index, watchTimeInSec = remainingWatchTimeInSec)
        else
          userStat.copy(hour = userStat.hour + index, watchTimeInSec = offset)
      })
  }
})

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.