3

Using spark dataframe i need to convert the row values into column and partition by user id and create a csv file.


val someDF = Seq(
  ("user1", "math","algebra-1","90"),
  ("user1", "physics","gravity","70"),
  ("user3", "biology","health","50"),
  ("user2", "biology","health","100"),
  ("user1", "math","algebra-1","40"),
  ("user2", "physics","gravity-2","20")
).toDF("user_id", "course_id","lesson_name","score")

someDF.show(false)

+-------+---------+-----------+-----+
|user_id|course_id|lesson_name|score|
+-------+---------+-----------+-----+
|  user1|     math|  algebra-1|   90|
|  user1|  physics|    gravity|   70|
|  user3|  biology|     health|   50|
|  user2|  biology|     health|  100|
|  user1|     math|  algebra-1|   40|
|  user2|  physics|  gravity-2|   20|
+-------+---------+-----------+-----+

val result = someDF.groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

result.show(false)

+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user1|     math|       90|   null|     null|  null|
|  user2|  biology|     null|   null|     null|   100|
|  user2|  physics|     null|   null|       20|  null|
|  user1|  physics|     null|     70|     null|  null|
+-------+---------+---------+-------+---------+------+


With the above code i'm able to convert row value(lesson_name) to column name. But I need to save the out in csv in a course_wise

Expected out in csv should be like this below formate.

biology.csv // Expected Output

+-------+---------+------+
|user_id|course_id|health|
+-------+---------+------+
|  user3|  biology|  50  |
|  user2|  biology| 100  |
+-------+---------+-------

physics.csv // Expected Output

+-------+---------+---------+-------
|user_id|course_id|gravity-2|gravity|
+-------+---------+---------+-------+
|  user2|  physics|  50     |  null |
|  user1|  physics| 100     |  70   | 
+-------+---------+---------+-------+

**Note: Each course in a csv it should contain only it's specifi lesson names and it should not contain any non relevant course lesson names.

Actually in csv i'm able to in below formate**

result.write
  .partitionBy("course_id")
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save(somepath)


eg:

biology.csv // Wrong output, Due to it is containing non-relevant course lesson's(algebra-1,gravity-2,algebra-1)
+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user2|  biology|     null|   null|     null|   100|
+-------+---------+---------+-------+---------+------+

Anyone can help to solve this problem ?

2 Answers 2

1

Just filter by course before you pivot:

val result = someDF.filter($"course_id" === "physics").groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

+-------+---------+-------+---------+
|user_id|course_id|gravity|gravity-2|
+-------+---------+-------+---------+
|user2  |physics  |null   |20       |
|user1  |physics  |70     |null     |

+-------+---------+-------+---------+

Sign up to request clarification or add additional context in comments.

2 Comments

Why are you hardcoding "physics"? What if i have some 1000 course_id's ? how do we handle in that case ?
I wanted to create a course wise csv file? let's say i have some 1000 courses in that case and each course is having some 5 lesson's in that case how do you handle that? @Andrew
0

I'm assuming you mean you'd like to save the data into separate directories by course_id. you can use this approach.

scala> val someDF = Seq(
("user1", "math","algebra-1","90"),
("user1", "physics","gravity","70"),
("user3", "biology","health","50"),
("user2", "biology","health","100"),
("user1", "math","algebra-1","40"),
("user2", "physics","gravity-2","20")
).toDF("user_id", "course_id","lesson_name","score")


scala> val result = someDF.groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

scala>     val eventNames = result.select($"course_id").distinct().collect() 
var eventlist =eventNames.map(x => x(0).toString)



for (eventName <- eventlist) {
val course = result.where($"course_id" === lit(eventName))
//remove null column

val row = course
.select(course.columns.map(c => when(col(c).isNull, 0).otherwise(1).as(c)): _*)
.groupBy().max(course.columns.map(c => c): _*)
.first

val colKeep = row.getValuesMap[Int](row.schema.fieldNames)
.map{c => if (c._2 == 1) Some(c._1) else None }
.flatten.toArray


var final_df = course.select(row.schema.fieldNames.intersect(colKeep)
.map(c => col(c.drop(4).dropRight(1))): _*)


final_df.show()

final_df.coalesce(1).write.mode("overwrite").format("csv").save(s"${eventName}")
}


+-------+---------+------+
|user_id|course_id|health|
+-------+---------+------+
|  user3|  biology|    50|
|  user2|  biology|   100|
+-------+---------+------+

+-------+---------+-------+---------+
|user_id|course_id|gravity|gravity-2|
+-------+---------+-------+---------+
|  user2|  physics|   null|       20|
|  user1|  physics|     70|     null|
+-------+---------+-------+---------+

+-------+---------+---------+
|user_id|course_id|algebra-1|
+-------+---------+---------+
|  user1|     math|       90|
+-------+---------+---------+

if it solves your purpose please accept the answer.HAppy Hadoop

16 Comments

What if i have another column(batchid) along with courseid ? Let's say i want to save on both courseid and batchid ? val someDF = Seq( ("user1", "math","algebra-1","90","b1"), ("user1", "physics","gravity","70","b1"), ("user3", "biology","health","50","b2"), ("user2", "biology","health","100","b2"), ("user1", "math","algebra-1","40","b1"), ("user2", "physics","gravity-2","20","b3") ).toDF("user_id", "course_id","lesson_name","score","batch_id")
I want create a csv file on both batchid and courseid out put should be like this +-------+---------+---------+-------+---------+------+---------+ |user_id|course_id|algebra-1|gravity|gravity-2|health|course_id| +-------+---------+---------+-------+---------+------+---------+ | user2| physics| null| null| 20| null|b3. | ----------------------------------------------------------------
Will that not be a pefromance impact ? When we have huge courseid and batchid?
this is totally different question, so add a new question for this. and if its solve your problem please accept the answer.
and also add a proper data frame with an expected output.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.