0

I'm working with Spark 2.1.1 and Scala 2.11.8

I'm executing my code in Spark-shell. This is the code I'm executing

val read_file1 = sc.textFile("Path to file 1");

val uid = read_file1.map(line => line.split(",")).map(array => array.map(arr => {
 | if(arr.contains(":")) (array(2).split(":")(0), arr.split(":")(0))
 |  else (array(2).split(":")(0), arr)}))

val rdd1 = uid.map(array => array.drop(4)).flatMap(array => array.toSeq).map(y=>(y,1)).reduceByKey(_+_)

My output of this code is :

(( v67430612_serv78i, fb_201906266952256),1)
(( v74005958_serv35i, fb_128431994336303),1)

However for the two RDDs' outputs, when I execute :

uid2.map(x => ((x._1, x._2), x._3)).join(rdd1).map(y => ((y._1._1, y._1._2, y._2._1), y._2._2))

I get the error :

 "java.lang.UnsupportedOperationException: empty collection" 

Why am I getting this error?

Here are samples of the input files:-

File 1 :

2017-05-09 21:52:42 , 1494391962 , p69465323_serv80i:10:450 , 7 , fb_406423006398063:396560, guest_861067032060185_android:671051, fb_100000829486587:186589, fb_100007900293502:407374, fb_172395756592775:649795
2017-05-09 21:52:42 , 1494391962 , z67265107_serv77i:4:45 , 2:Re , fb_106996523208498:110066, fb_274049626104849:86632, fb_111857069377742:69348, fb_127277511127344:46246

File 2 :

fb_100008724660685,302502,-450,v300430479_serv73i:10:450,switchtable,2017-04-30 00:00:00    
fb_190306964768414,147785,-6580,r308423810_serv31i::20,invite,2017-04-30 00:00:00

I just noted this : When I'm executing

rdd1.take(10).foreach(println) or rdd1.first()

I get this message too before the output :

WARN Executor: Managed memory leak detected; size = 39979424 bytes, TID = 11

I don't know if this might have anything to do with the problem??

Another note : this error only occurs when I do

res.first()

for

uid2.map(x => ((x._1, x._2), x._3)).join(rdd1).map(y => ((y._1._1, y._1._2, y._2._1), y._2._2))

On doing

res.take(10).foreach(println)

I don't get any output but no error is returned either.

8
  • Can you share the whole exception stack trace? Commented Jul 3, 2017 at 8:13
  • @ stefanobaghino I don't have the whole stacktrace. I get the ...48 elided message. Please let me know how to get the whole thing Commented Jul 3, 2017 at 8:17
  • So you only get a single line that says java.lang.UnsupportedOperationException: empty collection? Commented Jul 3, 2017 at 8:18
  • @ stefanobaghino I get this : java.lang.UnsupportedOperationException: empty collection at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.first(RDD.scala:1367) ... 48 elided Commented Jul 3, 2017 at 8:21
  • if you have a csv file why dont you use spark-csv file to read the files ? Commented Jul 3, 2017 at 8:22

2 Answers 2

2

You forgot to trim the spaces in the tuples created from splitted line so nothing was joined as they didn't match. So when you tried take from an empty rdd, exception was thrown.

You can use following solution. Its working in mine.

val read_file1 = sc.textFile("Path to file 1");

val uid = read_file1.map(line => line.split(",")).map(array => array.map(arr => {
   if(arr.contains(":")) (array(2).split(":")(0).trim, arr.split(":")(0).trim)
    else (array(2).split(":")(0).trim, arr.trim)}))

val rdd1 = uid.map(array => array.drop(4)).flatMap(array => array.toSeq).map(y=>(y,1)).reduceByKey(_+_)


val read_file2 = sc.textFile("Path to File 2");
val uid2 = read_file2.map(line => {var arr = line.split(","); (arr(3).split(":")(0).trim,arr(0).trim,arr(2).trim)});

val res = uid2.map(x => ((x._1, x._2), x._3)).join(rdd1).map(y => ((y._1._1, y._1._2, y._2._1), y._2._2))
res.take(10).foreach(println)
Sign up to request clarification or add additional context in comments.

2 Comments

@ Ramesh Maharjan Can the size of the file have something to do with this?? Because the problem is still there :(
here's a link of the screenshot for this drive.google.com/open?id=0B_VulQELUNoFNlh6dG5QUFY1VWs
1

You get an empty collection after the join, it happens when there are now corresponding keys in rdds. Either keys are not trimmed, sliced incorrectly or there were not any matches at all. I suggest checking if there are matching keys in your files/rdds, checking if the data was extracted correctly and checking if you need inner join rather than left or right outer join.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.