0

Here is my Java spark code. This is the Spark CSV data format.

"f_name","l_name","job","gender","age","salary"
"Elsdon","Jaycob","Java programmer","male",43,2000
"Tamsen","Brittany","Java programmer","female",23,1500
"Floyd","Donny","Java programmer","male",33,1800

And I generate the Person class which contains the above data

public class Person implements Serializable {

    private String firstName;
    private String lastName;
    private String job;
    private String gender;
    private int salary;
    private int age;

    public Person(String firstName, String lastName, String job, String gender, int age, int salary) {

        this.firstName = firstName;
        this.lastName = lastName;
        this.job = job;
        this.gender = gender;
        this.age = age;
        this.salary = salary;
        }
... getter and setter method.

And the below codes try to generate Java RDD with spark java client.

SparkConf sc = new SparkConf().setAppName("SparkTest").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> rdd_text = jsc.textFile("file:///" + srcDir + srcFile);


String[] header = rdd_text.map(line -> line.split(",")).first();
System.out.println(header[4]); // "age" is printed
JavaRDD<Person> persons = rdd_text.filter(line -> line.split(",")[4] != header[4]).map(
   line -> {
      String[] info = line.split(",");

      System.out.println(info[4]); //43,23,33,"age" are printed

      Person p = new Person(info[0], info[1], info[2], info[3], 
                                    Integer.parseInt(info[4]), Integer.parseInt(info[5]));

     return p;
});

System.out.println(persons.collect());

System.out.println(info[4]) code line prints:

43
23
33
"age"

And then It throws the following exception,

java.lang.NumberFormatException: For input string: ""age""
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:569)
    at java.lang.Integer.parseInt(Integer.java:615)
    at com.aaa.spark.JavaClient.lambda$2(JavaClient.java:33)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:936)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

I have no idea which line has the error codes and why. System.out.println(info[4]) prints the "age" String value.

1 Answer 1

1

You read file as usual text file, not CSV:

 jsc.textFile("file:///" + srcDir + srcFile);

File first line with headers (with "age" value) also processed by Integer.parseInt(info[4]) and this the reason for error.

Spark has specific methods for parse CSV, you can use them:

https://github.com/databricks/spark-csv

Latest Spark versions has CSV parsing from the box, please check the documentation.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.