I have an input file test-reading.csv
id,sku,price
"100002701--425370728",100002701,12159
"100002701--510892030",100002701,11021
"100002701-235195215",100002701,12330
"100002701-110442364",100002701,9901
"100002701-1963746094",100002701,11243
I wrote following source code in order to have a minimal, complete, and verifiable example of the problem I'm facing.
There is a ReadingRecord class used to read the CSV file and a WritingRecord used to write the output. Incidentally now they are almost identical but in the real program were quite different because they represent input and output structure.
The remaining code starts Spark, read the CSV, map ReadingRecord to WritingRecord and write an output CSV.
The question is: why if I uncomment the for loop into the flatMapGroups method this Spark program stops to write the CSV output?
case class ReadingRecord(var id: String, var sku: Integer, var price: Integer) {
def toWritingRecord(): WritingRecord = {
new WritingRecord(this.id, this.sku, this.price)
}
}
case class WritingRecord(var id: String, var sku: Integer, var price: Integer)
object ReadingRecordEncoders {
implicit def ReadingRecordEncoder: org.apache.spark.sql.Encoder[ReadingRecord] =
org.apache.spark.sql.Encoders.kryo[ReadingRecord]
}
object WritingTest {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[8]")
.setAppName("writing-test")
.set("spark.executor.memory", "1gb")
.set("spark.num.executors", "8")
.set("spark.executor.heartbeatInterval", "120")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
import ReadingRecordEncoders._
val data = spark.read.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv("test-reading.csv")
.map(r => {
println(r)
new ReadingRecord(r(0).asInstanceOf[String], r(1).asInstanceOf[Integer], r(2).asInstanceOf[Integer])
}).groupByKey(r1 => r1.sku)
val data1 = data.flatMapGroups((a: Integer, b: Iterator[ReadingRecord]) => {
var list = new ArrayList[ReadingRecord]
try {
// for (o <- b) {
// list.add(o)
// }
} finally {
list.clear()
list = null
}
b.map(f => f.toWritingRecord)
})
data1.printSchema()
data1.write
.format("csv")
.option("header", "true")
.save("output.csv")
}
}