0

First of all I am a C# developer and very new to Scala. We are trying to use Spark to query SQL and Cassandra and this is a little proof of concept program.

 var output: StringBuilder = new StringBuilder();
  try {
    //var output: StringBuilder = new StringBuilder();
    var config = new Config(args);
    val sparkConf = new SparkConf(true)
      .set("spark.cassandra.connection.host", config.cassanrdaClusterIp).set("spark.akka.heartbeat.interval", "100")
    //var output: StringBuilder = new StringBuilder();
    val sparkContext: SparkContext = new SparkContext(config.sparkConnectionString, "springcm-spark-webserver", sparkConf)
    val sqlContext = new CassandraSQLContext(sparkContext)
    val sqlConnectionString = "jdbc:sqlserver://" + config.sqlConnectionString;
    if (args(0) == "DocHistoryReport") {
    val docHistoryReport = new DocHistoryReport(sparkContext, sqlContext, sqlConnectionString, config.cassanrdaKeyspace)
    // var output: StringBuilder = new StringBuilder();
    var result = docHistoryReport.Execute(config.accountId, config.userId, config.startDate, config.endDate, config.dateBucketType);
    result.collect();
    var file: File = new File("result.csv");
    //  var output: StringBuilder = new StringBuilder();
    if (!file.exists()) {
      file.createNewFile();
    }
    val pw = new PrintWriter(file);
    result.foreach(row => {
      output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
    })
    pw.write(output.toString());
    pw.flush();
    pw.close;
  }
  else {
    throw new IllegalArgumentException("Unsuported report type " + args(0));
  }
}

The code creates a spark context, runs a simple report and writes the result to file. Note that the variable output is initialized several times in the code but all but one is commented out. If I initialize output anywhere but where it is currently declared, the result.csv file will be empty and the output variable will be reinitialized several times during the execution of the for loop wiping the result.

Could someone please explain to what is going on and why the location of the variable initialization matters. Thanks.

1 Answer 1

1

I would be extremely surprised if this actually depends on initialization location: the expected result is the empty file in any case.

result.foreach(row => {
  output.append(row.toString().stripPrefix("[").stripSuffix("]") + sys.props("line.separator"));
})

is very close to this example from the documentation:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

and has the same problem: the argument of foreach (row => ...) is serialized and sent to each worker, and when deserializing it creates a new output instead of referring to the original StringBuilder on a different process or computer. Reading the linked section above should help you understand more.

Possible solutions:

  1. Use collect to get all elements of the RDD to the driver and call foreach on the result (in fact your program already does this, but throws the result away).

  2. Put code for writing to the file in the foreach call (obviously it has to be a file which can be accessed from worker machines, e.g. using HDFS).

  3. Use an Accumulator.

  4. Use map or mapPartition to build up the strings you want separately on each worker (getting an RDD[String]) and apply either of the above solutions to the result.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.