1

I have a dataframe that contains rows like below and i need to split this data to get month wise series on the basis of pa_start_date and pa_end_date and create a new column period start and end date.

i/p dataframe df is

    p_id pa_id  p_st_date   p_end_date     pa_start_date   pa_end_date  
    p1   pa1    2-Jan-18      5-Dec-18     2-Mar-18        8-Aug-18       
    p1   pa2    3-Jan-18      8-Dec-18     6-Mar-18        10-Nov-18   
    p1   pa3    1-Jan-17      1-Dec-17     9-Feb-17        20-Apr-17  

o/p is

p_id pa_id  p_st_date   p_end_date pa_start_date pa_end_date period_start_date period_end_date
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     2-Mar-18 31-Mar-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Apr-18 30-Apr-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-May-18 31-May-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Jun-18 30-Jun-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Jul-18 31-Jul-18
p1   pa1    2-Jan-18    5-Dec-18   2-Mar-18      8-Aug-18     1-Aug-18 31-Aug-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    6-Mar-18 31-Mar-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Apr-18 30-Apr-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-May-18 31-May-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Jun-18 30-Jun-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Jul-18 31-Jul-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Aug-18 31-Aug-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Sep-18 30-Sep-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Oct-18 30-Oct-18
p1   pa2    3-Jan-18    8-Dec-18   6-Mar-18      10-Nov-18    1-Nov-18 30-Nov-18
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    9-Feb-17 28-Feb-17
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    1-Mar-17 31-Mar-17
p1   pa3    1-Jan-17    1-Dec-17   9-Feb-17      20-Apr-17    1-Apr-17 30-Apr-17

2 Answers 2

2

I have done with creating an UDF like below.

This UDF will create an array of dates(dates from all the months inclusive start and end dates) if pa_start_date and the number of months between the pa_start_date and pa_end_date passed as parameters.

def udfFunc: ((Date, Long) => Array[String]) = {
            (d, l) =>
                {
                    var t = LocalDate.fromDateFields(d)
                    val dates: Array[String] = new Array[String](l.toInt)
                    for (i <- 0 until l.toInt) {
                        println(t)
                        dates(i) = t.toString("YYYY-MM-dd")
                        t = LocalDate.fromDateFields(t.toDate()).plusMonths(1)
                    }
                    dates
                }
        }
        val my_udf = udf(udfFunc)

And the final dataframe is created as below.

val df = ss.read.format("csv").option("header", true).load(path)
            .select($"p_id", $"pa_id", $"p_st_date", $"p_end_date", $"pa_start_date", $"pa_end_date",
                my_udf(to_date(col("pa_start_date"), "dd-MMM-yy"), ceil(months_between(to_date(col("pa_end_date"), "dd-MMM-yy"), to_date(col("pa_start_date"), "dd-MMM-yy")))).alias("udf")) // gives array of dates from UDF
            .withColumn("after_divide", explode($"udf")) // divide array of dates to individual rows
            .withColumn("period_end_date", date_format(last_day($"after_divide"), "dd-MMM-yy")) // fetching the end_date for the particular date
            .drop("udf")
            .withColumn("row_number", row_number() over (Window.partitionBy("p_id", "pa_id", "p_st_date", "p_end_date", "pa_start_date", "pa_end_date").orderBy(col("after_divide").asc))) // just helper column for calculating `period_start_date` below
            .withColumn("period_start_date", date_format(when(col("row_number").isin(1), $"after_divide").otherwise(trunc($"after_divide", "month")), "dd-MMM-yy"))
            .drop("after_divide")
            .drop("row_number") // dropping all the helper columns which is not needed in output.

And here is the output.

+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|p_id|pa_id|p_st_date|p_end_date|pa_start_date|pa_end_date|period_end_date|period_start_date|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      28-Feb-17|        09-Feb-17|
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      31-Mar-17|        01-Mar-17|
|  p1|  pa3| 1-Jan-17|  1-Dec-17|     9-Feb-17|  20-Apr-17|      30-Apr-17|        01-Apr-17|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Mar-18|        06-Mar-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Apr-18|        01-Apr-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-May-18|        01-May-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Jun-18|        01-Jun-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Jul-18|        01-Jul-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Aug-18|        01-Aug-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Sep-18|        01-Sep-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      31-Oct-18|        01-Oct-18|
|  p1|  pa2| 3-Jan-18|  8-Dec-18|     6-Mar-18|  10-Nov-18|      30-Nov-18|        01-Nov-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Mar-18|        02-Mar-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      30-Apr-18|        01-Apr-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-May-18|        01-May-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      30-Jun-18|        01-Jun-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Jul-18|        01-Jul-18|
|  p1|  pa1| 2-Jan-18|  5-Dec-18|     2-Mar-18|   8-Aug-18|      31-Aug-18|        01-Aug-18|
+----+-----+---------+----------+-------------+-----------+---------------+-----------------+
Sign up to request clarification or add additional context in comments.

Comments

0

Here is how I did it using RDD and UDF

kept data in a file

/tmp/pdata.csv
p_id,pa_id,p_st_date,p_end_date,pa_start_date,pa_end_date
p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18
p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18
p1,pa3,1-Jan-17,1-Dec-17,9-Feb-17,20-Apr-17

spark scala code

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import scala.collection.mutable.ListBuffer
import java.util.{GregorianCalendar, Date}
import java.util.Calendar
val ipt = spark.read.format("com.databricks.spark.csv").option("header","true").option("inferchema","true").load("/tmp/pdata.csv")
val format = new java.text.SimpleDateFormat("dd-MMM-yy")
format.format(new java.util.Date())  --test date
def generateDates(startdate: Date, enddate: Date): ListBuffer[String] ={
var dateList = new ListBuffer[String]()
var calendar = new GregorianCalendar()
calendar.setTime(startdate)
val monthName  = Array("Jan", "Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sept", "Oct", "Nov","Dec")
dateList +=(calendar.get(Calendar.DAY_OF_MONTH)) + "-" + monthName(calendar.get(Calendar.MONTH)) + "-" +  (calendar.get(Calendar.YEAR)) +","+ 
(calendar.getActualMaximum(Calendar.DAY_OF_MONTH)) + "-" + monthName(calendar.get(Calendar.MONTH)) + "-" +  (calendar.get(Calendar.YEAR))
calendar.add(Calendar.MONTH, 1)
while (calendar.getTime().before(enddate)) {
dateList +="01-" + monthName(calendar.get(Calendar.MONTH)) + "-" +  (calendar.get(Calendar.YEAR)) +","+ 
(calendar.getActualMaximum(Calendar.DAY_OF_MONTH)) + "-" + monthName(calendar.get(Calendar.MONTH)) + "-" +  (calendar.get(Calendar.YEAR))
calendar.add(Calendar.MONTH, 1)
}
dateList
}
val oo  = ipt.rdd.map(x=>(x(0).toString(),x(1).toString(),x(2).toString(),x(3).toString(),x(4).toString(),x(5).toString()))
oo.flatMap(pp=> {
var allDates = new ListBuffer[(String,String,String,String,String,String,String)]()
for (x <- generateDates(format.parse(pp._5),format.parse(pp._6))) {
allDates += ((pp._1,pp._2,pp._3,pp._4,pp._5,pp._6,x))}
allDates
}).collect().foreach(println)

I did Flatmap and while doing that function is used to pull concatenated dates and list buffer to append the concatenated values I used monthName to get the month as per your output format. output came as below

(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,2-Mar-2018,31-Mar-2018)
(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,01-Apr-2018,30-Apr-2018)
(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,01-May-2018,31-May-2018)
(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,01-Jun-2018,30-Jun-2018)
(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,01-Jul-2018,31-Jul-2018)
(p1,pa1,2-Jan-18,5-Dec-18,2-Mar-18,8-Aug-18,01-Aug-2018,31-Aug-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,6-Mar-2018,31-Mar-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Apr-2018,30-Apr-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-May-2018,31-May-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Jun-2018,30-Jun-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Jul-2018,31-Jul-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Aug-2018,31-Aug-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Sept-2018,30-Sept-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Oct-2018,31-Oct-2018)
(p1,pa2,3-Jan-18,8-Dec-18,6-Mar-18,10-Nov-18,01-Nov-2018,30-Nov-2018)
(p1,pa3,1-Jan-17,1-Dec-17,9-Feb-17,20-Apr-17,9-Feb-2017,28-Feb-2017)
(p1,pa3,1-Jan-17,1-Dec-17,9-Feb-17,20-Apr-17,01-Mar-2017,31-Mar-2017)
(p1,pa3,1-Jan-17,1-Dec-17,9-Feb-17,20-Apr-17,01-Apr-2017,30-Apr-2017)

I am happy t explain more if any one has doubt and also I might have read file in a silly way we can improve that as well.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.