10

Find previous month sale of each city from Spark Data frame

|City|     Month   |Sale|
+----+----------- +----- +
|  c1|    JAN-2017|  49 |
|  c1|    FEB-2017|  46 |
|  c1|    MAR-2017|  83 |
|  c2|    JAN-2017|  59 |
|  c2|    MAY-2017|  60 |
|  c2|    JUN-2017|  49 |
|  c2|    JUL-2017|  73 |
+----+-----+----+-------

Required solution is

|City|     Month  |Sale   |previous_sale|
+----+-----+-------+-------------+--------
|  c1|    JAN-2017|  49|           NULL  |
|  c1|    FEB-2017|  46|           49    |
|  c1|    MAR-2017|  83|           46    |
|  c2|    JAN-2017|  59|           NULL  |
|  c2|    MAY-2017|  60|           59    |
|  c2|    JUN-2017|  49|           60    |
|  c2|    JUL-2017|  73|           49    |
+----+-----+----+-------------+-----------

Please help me

2 Answers 2

16

You can use lag function to get the previous value

If you want to sort by month you need to convert to proper date. For "JAN-2017" to "01-01-2017" something like this.

import spark.implicits._
val df = spark.sparkContext.parallelize(Seq(
  ("c1", "JAN-2017", 49),
("c1", "FEB-2017", 46),
("c1", "MAR-2017", 83),
("c2", "JAN-2017", 59),
("c2", "MAY-2017", 60),
("c2", "JUN-2017", 49),
("c2", "JUL-2017", 73)
)).toDF("city", "month", "sales")

val window = Window.partitionBy("city").orderBy("month")

df.withColumn("previous_sale", lag($"sales", 1, null).over(window)).show

Output:

+----+--------+-----+----+
|city|   month|sales| previous_sale|
+----+--------+-----+----+
|  c1|FEB-2017|   46|null|
|  c1|JAN-2017|   49|  46|
|  c1|MAR-2017|   83|  49|
|  c2|JAN-2017|   59|null|
|  c2|JUL-2017|   73|  59|
|  c2|JUN-2017|   49|  73|
|  c2|MAY-2017|   60|  49|
+----+--------+-----+----+

You can use this UDF to create a default date like 01/month/year which will be used so sort with date even if it has different year

val fullDate = udf((value :String )=>
{
  val months = List("JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC")
  val splited = value.split("-")
  new Date(splited(1).toInt, months.indexOf(splited(0)) + 1, 1)
})

df.withColumn("month", fullDate($"month")).show()

Hope this hepls!

Sign up to request clarification or add additional context in comments.

15 Comments

Hi Shankar, this working fine, but i need code without using Window.
i hope that you will help me.
I think this is the best possible way, it may not be as easy to get without using Window.
I think this is the best option we got, I don't know why your PM is not accepting, I can give you the logic for joining, First create a new column with a increasing number for each city c1 c2 and then and join with itself with with increasing id -1
find the max date and min date of a city and calculate all the months between them
|
0
package com.incedo.pharma
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.SparkSession
//import org.apache.spark.sql.expressions.Window
import java.sql.Date

class SecondTask {
  def previousMonthSale(){
    val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
    import ss.sqlContext.implicits._
    val df = sqlContext.read.format("com.databricks.spark.csv")
             .option("header", "true")
             .option("inferSchema", "true")
             .option("delimiter", "|")
             .load("taskTwoData.csv")
    val df1 = df.withColumn("Timestamp", unix_timestamp(df("Date"), "MM/dd/yyyy").cast("timestamp"))
    val df2 = df1.withColumn("Month",month(df1("Timestamp")))
    val df3 = df2.groupBy("City", "Month").agg(sum(col("Sale")).alias("Current_Month_Total_Sale")).orderBy("City","Month")
    val df4 = df3.withColumn("pre_month",df3("Month")-1)
    val df5 = df4.alias("a").join(df3.alias("b"),$"a.pre_month" === $"b.Month" && $"a.City" === $"b.City","left_outer")
              .select($"a.City",$"a.Month",$"a.Current_Month_Total_Sale",($"b.Current_Month_Total_Sale")
              .alias("Previous_Month_Total_Sale")).na.fill(0,Seq("Previous_Month_Total_Sale"))

    val df6 = df5.withColumn("Percent_Sale_Change",round(((df5("Current_Month_Total_Sale") - df5("Previous_Month_Total_Sale"))/df5("Current_Month_Total_Sale"))*100,2))
    val df7 = df6.groupBy("City").max("Current_Month_Total_Sale").alias("Max_Sale").orderBy("City")
    //df7.show()
    val df8 = df6.join(df7, Seq("City"))
    val df9 = df8.withColumn("Percent_Sale_By_Max_Sale", round(df8("Current_Month_Total_Sale")/df8("max(Current_Month_Total_Sale)"),2))
             .drop("max(Current_Month_Total_Sale)")
    df9.toDF().show()
  }
}

object taskTwo {
  def main(arr: Array[String]) {
    new SecondTask().previousMonthSale()
  }`enter code here`
}

2 Comments

hi Shankar,please see my solution
Hi Shankar, please se problem in this URL stackoverflow.com/questions/45883039/… ,please guide me .

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.