I have two RDDs: points and pointsWithinEps. Each point in points represent x, y coordinate. pointsWithinEps represent two points and distance between them: ((x, y), distance). I want to loop all points and for every point filter only that elements which are in the pointsWithinEps as x (first) coordinate. So I do following:
points.foreach(p =>
val distances = pointsWithinEps.filter{
case((x, y), distance) => x == p
}
if (distances.count() > 3) {
// do some other actions
}
)
But this syntax is not valid. As far as I understand it is not allowed to create variables inside Spark foreach. Should I do something like this?
for (i <- 0 to points.count().toInt) {
val p = points.take(i + 1).drop(i) // take the point
val distances = pointsWithinEps.filter{
case((x, y), distance) => x == p
}
if (distances.count() > 3) {
// do some other actions
}
}
Or there is a better way to do this? Complete code is hosted here: https://github.com/timasjov/spark-learning/blob/master/src/DBSCAN.scala
EDIT:
points.foreach({ p =>
val pointNeighbours = pointsWithinEps.filter {
case ((x, y), distance) => x == p
}
println(pointNeighbours)
})
Right now I have following code, but it throws a NullPointerException (pointsWithinEps). How it can be fixed any why pointsWithinEps is null (before foreach there are elements in it)?
points, you want all ((x,y),distance) tuples frompointsWithinEpsthat originate on the same (x) ?