1

I have a DF with column session_start and session end. I need to create another row so if the start and end fall in different dates. For ex : We have df as

session_start session_stop
01-05-2021 23:11:40 02-05-2021 02:13:25

So the new output df should break this into two rows like :

session_start session_stop
01-05-2021 23:11:40 01-05-2021 23:59:59
02-05-2021 00:00:00 02-05-2021 02:13:25

Will all other columns should remain common in both the rows.

1

2 Answers 2

0

You can use a flatMap operation on your DF.

The function you use in the flatMap will produce either one or two row(s).

Sign up to request clarification or add additional context in comments.

1 Comment

I have created a method which willl takes the start and stop time and return a list with two rows of start and stop. But how shall I call the flatMap to use that method.
0

I did it without flatMap function. Created a UDF generateOverlappedSessionsFromTimestampRanges which does the conversion and used it as below

// UDF
import java.sql.Timestamp
import java.time.temporal.ChronoUnit
import java.time.LocalDateTime

val generateOverlappedSessionsFromTimestampRanges = udf {(localStartTimestamp: Timestamp, localEndTimestamp: Timestamp) =>
    val localStartLdt = localStartTimestamp.toLocalDateTime
    val localEndLdt = localEndTimestamp.toLocalDateTime
    
    var output : List[(Timestamp, Timestamp)] = List()
    if(localStartLdt.toLocalDate().until(localEndLdt.toLocalDate(), ChronoUnit.DAYS) > 0) {
      val newLocalEndLdt = LocalDateTime.of(localStartLdt.getYear(), localStartLdt.getMonth(), localStartLdt.getDayOfMonth(), 23, 59, 59)
      val newLocalStartLdt = LocalDateTime.of(localEndLdt.getYear(), localEndLdt.getMonth(), localEndLdt.getDayOfMonth(), 0, 0, 0)
      output = output :+ (Timestamp.valueOf(localStartLdt),
                        Timestamp.valueOf(newLocalEndLdt)
                        )
      output = output :+ (Timestamp.valueOf(newLocalStartLdt),
                        Timestamp.valueOf(localEndLdt)
                        )
    } else {
      output = output :+ (Timestamp.valueOf(localStartLdt),
                        Timestamp.valueOf(localEndLdt)
                        )
    }
    output
  }
//Unit test case for above UDF
import org.apache.spark.sql.functions._
import java.sql.Timestamp
import org.apache.spark.sql.types.TimestampType
val timestamps: Seq[(Timestamp, Timestamp)] = Seq(
  (Timestamp.valueOf("2020-02-10 22:07:25.000"),
  Timestamp.valueOf("2020-02-11 02:07:25.000")
  )
  )
val timestampsDf = timestamps.toDF("local_session_start_timestamp", "local_session_stop_timestamp")
var output = timestampsDf.withColumn("to_be_explode", TimeUtil.generateOverlappedSessionsFromTimestampRanges1(timestampsDf("local_session_start_timestamp"),
                                                                    timestampsDf("local_session_stop_timestamp")
                                                                   ))
output = output.withColumn("exploded_session_time",explode(col("to_be_explode")))
        .withColumn("new_local_session_start",col("exploded_session_time._1"))
        .withColumn("new_local_session_stop", col("exploded_session_time._2"))
        .drop("to_be_explode", "exploded_session_time")
display(output)
df.withColumn("to_be_explode", generateOverlappedSessionsFromTimestampRanges(df("session_start"), df("session_stop")))
        .withColumn("exploded_session_time",explode(col("to_be_explode")))
        .withColumn("session_start",col("exploded_session_time._1"))
        .withColumn("session_stop", col("exploded_session_time._2"))
        .drop("to_be_explode", "exploded_session_time")

1 Comment

Please share the body of your UDF for others people that would look at your answer ;)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.