0

I have two RDDs: points and pointsWithinEps. Each point in points represent x, y coordinate. pointsWithinEps represent two points and distance between them: ((x, y), distance). I want to loop all points and for every point filter only that elements which are in the pointsWithinEps as x (first) coordinate. So I do following:

    points.foreach(p =>
      val distances = pointsWithinEps.filter{
        case((x, y), distance) => x == p
      }
      if (distances.count() > 3) {
//        do some other actions
      }
    )

But this syntax is not valid. As far as I understand it is not allowed to create variables inside Spark foreach. Should I do something like this?

for (i <- 0 to points.count().toInt) {
  val p = points.take(i + 1).drop(i) // take the point
  val distances = pointsWithinEps.filter{
    case((x, y), distance) => x == p
  }
  if (distances.count() > 3) {
    //        do some other actions
  }
}

Or there is a better way to do this? Complete code is hosted here: https://github.com/timasjov/spark-learning/blob/master/src/DBSCAN.scala

EDIT:

points.foreach({ p =>
  val pointNeighbours = pointsWithinEps.filter {
    case ((x, y), distance) => x == p
  }
  println(pointNeighbours)
})

Right now I have following code, but it throws a NullPointerException (pointsWithinEps). How it can be fixed any why pointsWithinEps is null (before foreach there are elements in it)?

2
  • Do I understand correctly that for each point (x,y) on points, you want all ((x,y),distance) tuples from pointsWithinEps that originate on the same (x) ? Commented Oct 26, 2014 at 20:19
  • yes. Basically for each point I want to find which other points are its neighbors (points that are within the epsilon). In my case it is point itself and x in ((x, y), distance) structure. Code is in the github, so for example you can execute it and in debugger find exactly what are values. Commented Oct 26, 2014 at 20:22

2 Answers 2

2

In order to collect all distance points that start on a given coordinate, a simple distributed way of doing that would be to key the points by that coordinate x and group them by that key, like this:

val pointsWithinEpsByX = pointsWithinEps.map{case ((x,y),distance) => (x,((x,y),distance))}
val xCoordinatesWithDistance = pointsWithinEpsByX.groupByKey

Then left-join the RDD of points with the result of the previous transformation:

val pointsWithCoordinatesWithDistance = points.leftOuterJoin(xCoordinatesWithDistance)
Sign up to request clarification or add additional context in comments.

9 Comments

Compiler shows that there in no groupByKey method, only groupBy. The same also for leftOuterJoin method. I am using spark 1.1.0 pre-built for Hadoop 1.X
Also, should I put it into foreach loop?
These are functions available on RDDs of (key, value) pairs through an implicit conversion. Import org.apache.spark.SparkContext._ at the top of your program to use these functions. - Also, there's no need for a loop. This functional pipeline gets the job done by applying transformations and grouping to the whole dataset.
Oooh, now I have groupByKey method, but I still do not have leftOuterJoin method. I do not understand where is leftOuterJoin method. Oooh, I understood, points are not of RDD type after reading from file and I cannot apply leftOuterJoin to them. How can they be converted to RDDs?
I assume that points is of the form (x,y) ?? The implict conversion is applied to RDDs of type Tuple2 or a "pair" like (x,y)
|
0

Declaring variables means you have a block, not just an expression, so you need to use braces {}, e.g.

point.foreach({p => ... })

1 Comment

Thanks! But can you also help with the functionality that I described?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.