0

I have a hashmap in which I stored the values

Map(862304021470656 -> List(0.0, 0.0, 0.0, 0.0, 1.540980096E9, 74.365111, 22.302669, 0.0),866561010400483 -> List(0.0, 1.0, 1.0, 2.0, 1.543622306E9, 78.0204, 10.005262, 56.0))

This is the dataframe

|             id|       lt|       ln|       evt|    lstevt|  s|  d|agl|chg| d1| d2| d3| d4|ebt|ibt|port| a1| a2| a3| a4|nos|dfrmd|
+---------------+---------+---------+----------+----------+---+---+---+---+---+---+---+---+---+---+----+---+---+---+---+---+-----+
|862304021470656|25.284158|82.435973|1540980095|1540980095|  0| 39|298|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|
|862304021470656|25.284158|82.435973|1540980105|1540980105|  0|  0|298|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|
|862304021470656|25.284724|82.434222|1540980155|1540980155| 14| 47|289|  0|  0|  1|  1|  2|  0|  5|  97| 11| -1| -1| 22|  0|    0|
|866561010400483|25.284858|82.433831|1544980165|1540980165| 12| 42|295|  0|  0|  1|  1|  2|  0|  5|  97| 12| -1| -1| 22|  0|    0|

I want to just filter those value from dataframe, comparing the 4th index of list from the evt column,picking only the rows whose evt value is greater than that 4th index value of list,key in the map is id column of dataframe.

1
  • how big is your hashmap? is it a good candidate for broadcasting (say, less than ~300 MB)? and what about rows that do not have matching key in the hashmap? Commented Dec 30, 2018 at 11:58

2 Answers 2

2

Here's one way using a UDF to fetch the evt value for comparison:

import org.apache.spark.sql.functions._

val df = Seq(
  (862304021470656L, 25.284158, 82.435973, 1540980095),
  (862304021470656L, 25.284158, 82.435973, 1540980105),
  (862304021470656L, 25.284724, 82.434222, 1540980155),
  (866561010400483L, 25.284858, 82.433831, 1544980165)
).toDF("id", "lt", "ln", "evt")

val listMap = Map(
  862304021470656L -> List(0.0, 0.0, 0.0, 0.0, 1.540980096E9, 74.365111, 22.302669, 0.0),
  866561010400483L -> List(0.0, 1.0, 1.0, 2.0, 1.543622306E9, 78.0204, 10.005262, 56.0)
)

def evtLimit(m: Map[Long, List[Double]], evtIdx: Int) = udf(
  (id: Long) => m.get(id) match {
      case Some(ls) => if (evtIdx < ls.size) ls(evtIdx) else Double.MaxValue
      case None => Double.MaxValue
    }
)

df.where($"evt" > evtLimit(listMap, 4)($"id")).show
// +---------------+---------+---------+----------+
// |             id|       lt|       ln|       evt|
// +---------------+---------+---------+----------+
// |862304021470656|25.284158|82.435973|1540980105|
// |862304021470656|25.284724|82.434222|1540980155|
// |866561010400483|25.284858|82.433831|1544980165|
// +---------------+---------+---------+----------+

Note that the UDF returns Double.MaxValue in case of non-matching key or invalid value in the provided Map. That can certainly be revised for specific business requirement.

Sign up to request clarification or add additional context in comments.

2 Comments

hi sir,please tell how do I get the filtered dataframe where key of hashmap is not in id value of dataframe.
@experiment, if I understand your question correctly you can use built-in method isin, like df.where(!$"id".isin(listMap.keys.toArray: _*)).
1

You can get this with a simple sql:

import spark.implicits._
import org.apache.spark.sql.functions._
val df = ... //your main Dataframe
val map = Map(..your data here..).toDF("id", "list")
val join = df.join(map, "id").filter(length($"list") >= 5 /* <-- just in case */)
val res = join.filter($"evt" > $"list"(4))

4 Comments

thanks for your effort,but I wanted to avoid joins for comparison because it is a heavy operation.
If your map is small enough, Spark will broadcast it and there will be no shuffle.
it will have atleast 1lakh data,would it be feasible then?
@experiment 1lakh? You mean 1 kb? You can set Spark broadcast thresholds via spark.sql.autoBroadcastJoinThreshold, which defaults to 10MB.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.