0

I have a dataframe

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
  1  12   13    12       13        1        [1.5,3.5]  4      4.5
  1  12   13    12       13        1        null       4.5    5
  1  12   13    12       13        1        null       5      5.5
  1  12   13    12       13        1        null       5.5    6
  1  13   14    12       13        2        null       6      6.5
  1  13   14    13       14        2        null       6.5    null
  2  13   14    13       14        2        [0.5,1.5]  2.5    3.5  
  2  13   14    13       14        2        null       3.5    4 
  2  13   14    13       14        2        null       4      null

so I wanted to apply a condition while using groupby in agg function that if we do groupby col("id") and col("detector") then I want to check the condition that if lag_interval in that group has any non-null value then in aggregation I want two columns one is

 min("lag_interval.col1") and other is max("lead_gpsdt") 

If the above condition is not met then I want

min("gpsdt"), max("lead_gpsdt")

using this approach I want to get the data with a condition

df.groupBy("detector","id").agg(first("lat-long").alias("start_coordinate"),
    last("lat-long").alias("end_coordinate"),struct(min("gpsdt"), max("lead_gpsdt")).as("interval"))

output

  id interval  start_coordinate end_coordinate
  1   [1.5,6]      [12,13]         [13,14] 
  1   [6,6.5]      [13,14]         [13,14]
  2   [0.5,4]      [13,14]         [13,14]

**

for more explanation

** if we see a part of what groupby("id","detector") does is taking a part out,

we have to see that if in that group of data if one of the value in the col("lag_interval") is not null then we need to use aggregation like this min(lag_interval.col1),max(lead_gpsdt) this condition will apply to below set of data

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
 1  12   13    12       13        1        [1.5,3.5]  4      4.5
 1  12   13    12       13        1        null       4.5    5
 1  12   13    12       13        1        null       5      5.5
 1  12   13    12       13        1        null       5.5    6

and if the all value of col("lag_interval") is null in that group of data then we need aggregation output as min("gpsdt"),max("lead_gpsdt") this condition will apply to below set of data

id lat long lag_lat lag_long detector lag_interval  gpsdt  lead_gpsdt
 1  13   14    12       13        2        null       6      6.5
 1  13   14    13       14        2        null       6.5    null
6
  • Is that the output for the given input? can you elaborate a little more ? Commented Sep 24, 2018 at 12:39
  • @RameshMaharjan yes sir,I have updated an elaborate explanation for the output Commented Sep 24, 2018 at 12:48
  • is lag_interval a struct column? can you also share the schema of input dataframe? Commented Sep 24, 2018 at 12:55
  • @RameshMaharjan yes its a struct Commented Sep 24, 2018 at 12:58
  • what is first("lat-long") and last("lat-long")? there is no such column as lat-long is there? see my answer below if it helps Commented Sep 24, 2018 at 13:35

1 Answer 1

5

The conditional dilemma that you have should be solved by using simple when inbuilt function as suggested below

import org.apache.spark.sql.functions._
df.groupBy("id","detector")
  .agg(
    struct(
      when(isnull(min("lag_interval.col1")), min("gpsdt")).otherwise(min("lag_interval.col1")).as("min"),
      max("lead_gpsdt").as(("max"))
    ).as("interval")
  )

which should give you output as

+---+--------+----------+
|id |detector|interval  |
+---+--------+----------+
|2  |2       |[0.5, 4.0]|
|1  |2       |[6.0, 6.5]|
|1  |1       |[1.5, 6.0]|
+---+--------+----------+

and I guess you must already have idea how to do first("lat-long").alias("start_coordinate"), last("lat-long").alias("end_coordinate") as you have done.

I hope the answer is helpful

Sign up to request clarification or add additional context in comments.

3 Comments

sir what are your views on persisting a dataframe and how to use it again?
you can use persist or cache. both saves data so that whenever you perform an action on a persisted or cached data, it is not recomputed from scratch. using the cached data is as usual for any other computations.
okay got it, I am not able to get how to use cached/persisted dataframe for further use.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.