0

I have an input file test-reading.csv

id,sku,price
"100002701--425370728",100002701,12159
"100002701--510892030",100002701,11021
"100002701-235195215",100002701,12330
"100002701-110442364",100002701,9901
"100002701-1963746094",100002701,11243

I wrote following source code in order to have a minimal, complete, and verifiable example of the problem I'm facing.

There is a ReadingRecord class used to read the CSV file and a WritingRecord used to write the output. Incidentally now they are almost identical but in the real program were quite different because they represent input and output structure.

The remaining code starts Spark, read the CSV, map ReadingRecord to WritingRecord and write an output CSV.

The question is: why if I uncomment the for loop into the flatMapGroups method this Spark program stops to write the CSV output?

case class ReadingRecord(var id: String, var sku: Integer, var price: Integer) {
  def toWritingRecord(): WritingRecord = {
    new WritingRecord(this.id, this.sku, this.price)
  }
}

case class WritingRecord(var id: String, var sku: Integer, var price: Integer)

object ReadingRecordEncoders {
  implicit def ReadingRecordEncoder: org.apache.spark.sql.Encoder[ReadingRecord] =
    org.apache.spark.sql.Encoders.kryo[ReadingRecord]
}

object WritingTest {

  def main(args: Array[String]) {

    val conf = new SparkConf()
      .setMaster("local[8]")
      .setAppName("writing-test")
      .set("spark.executor.memory", "1gb")
      .set("spark.num.executors", "8")
      .set("spark.executor.heartbeatInterval", "120")

    val spark = SparkSession.builder().config(conf).getOrCreate()

    import spark.implicits._
    import ReadingRecordEncoders._

    val data = spark.read.option("header", "true")
      .option("delimiter", ",")
      .option("inferSchema", "true")
      .csv("test-reading.csv")
      .map(r => {
        println(r)
        new ReadingRecord(r(0).asInstanceOf[String], r(1).asInstanceOf[Integer], r(2).asInstanceOf[Integer])
      }).groupByKey(r1 => r1.sku)

    val data1 = data.flatMapGroups((a: Integer, b: Iterator[ReadingRecord]) => {
      var list = new ArrayList[ReadingRecord]
      try {
        //        for (o <- b) {
        //          list.add(o)
        //        }
      } finally {
        list.clear()
        list = null
      }

      b.map(f => f.toWritingRecord)
    })

    data1.printSchema()

    data1.write
      .format("csv")
      .option("header", "true")
      .save("output.csv")
  }
}

1 Answer 1

1

With the commented out code included, you are trying to reuse the Iterator b. An Iterator is modified when it is used:

It is of particular importance to note that, unless stated otherwise, one should never use an iterator after calling a method on it.

See the Iterator documentation.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.