3

I'm trying to prepare a Library (written in Java) to run on Apache-Spark. Since the Library has hundreds of classes and still in active development stage, I do not want to serialize all of them one by one. Instead I searched for another method and found this, but again it does not resolve the serialization issue.

here is the code sample:

    List<Integer> data = Arrays.asList(1,2,3,4,5,6);
    JavaRDD<Integer> distData = sc.parallelize(data);
    JavaRDD<Year4D> years = distData.map(y -> func.call(y));
    List<Year4D> years1 = years.collect();

where func is a Function that generates 4 digit Year with using Year4D;

    private static Function<Integer, Year4D> func = new Function<Integer, Year4D>() {
    public Year4D call(Integer arg0) throws Exception {
        return new Year4D(arg0);
    }};

and Year4D does not implement Serializable;

public class Year4D{
private int year = 0;
public Year4D(int year) {
    if (year < 1000) year += (year < 70) ? 2000 : 1900;
    this.year = year;
}
public String toString() {
    return "Year4D [year=" + year + "]";
}}

Which produce "object not serializable" exception for the Year4D:

Job aborted due to stage failure: Task 6.0 in stage 0.0 (TID 6) had a not serializable result...

by the way, if I replace the Command Action collect() into foreach(func) it works,

So, my question is why collect() not works?

And If this approach is not good, what is the best practice to handle a Java Library which contains that much tons of complex classes?

PS. @Tzach said that Year4D isn't wrapped correctly so actually it is not serialized, then what is the correct implementation?

2 Answers 2

2

Solution 1 (which you will not use, since it is easier to modify each of the classes by making them implement Serializable): create wrapper classes that implement Serializable and overwrite their writeObject and readObject methods

public class Year4DWraper implements Serializable{

    private Year4D year4d;

    public Year4DWraper(Year4D year4d) {
        this.year4d = year4d;
    }
    public Year4D getYear4D(){
        return yeard4D;
    }

    private void writeObject(ObjectOutputStream os)
            throws IOException {
       os.writeInt(year4D.getYear());

    }

    private void readObject(ObjectInputStream is)
            throws IOException, ClassNotFoundException {
       int year = is.readInt();
       year4D = new Yeard4D(year);
    }

}

Solution 2: Use Kyro to do the serialization/deserialization for you

SparkConf conf = new SparkConf();
conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
...

public class MyRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(Year4D.class);
    }
}

It is advised that the classes contain a no-arg constructor.

By default, most classes will end up using FieldSerializer. It essentially does what hand written serialization would, but does it automatically. FieldSerializer does direct assignment to the object's fields. If the fields are public, protected, or default access (package private) and not marked as final, bytecode generation is used for maximum speed (see ReflectASM). For private fields, setAccessible and cached reflection is used, which is still quite fast.

If you are unhappy with the seriliazers Kyro provides by default or you have complex classes, you can always define your own.

Sign up to request clarification or add additional context in comments.

1 Comment

Solution 2 works!, with Kryo there is no need for wrapping,
1

First, foreach() works because it iterates over each partition locally, so it doesn't have to send the data from one node to another, or to the driver, so no Year4D has to be serialized.

If you follow the map transformation (which creates the Year4D objects) with any action / transformation that requires shuffle (e.g. groupByKey), or that requires sending the data back to the driver (like collect) - then the data must be serialized (how else would it be shared across separate Java processes?).

Now, since there's very little you can do without shuffles or collecting the data - most likely, you don't really have a choice, your data must be serializable.

2 Comments

as Beating Serialization in Spark suggests, Year4D assumed to be wraped with a serialized wrapper Function<Integer, Year4D>(). If not there must be some way of serialization without touching Year4D, else touching lots of classes which doesn't implements serializable and under heavy active development is very big issue.
"assumed to be wrapped" - but it isn't. What that link suggests is different from what you implemented - it suggest that the RDD will hold these wrapper objects instead of the Year4D - which means the map function's return type should be some UnserializableWrapper and not Year4D. But that idea is a good workaround for code that you can't change (3rd party lib) - for your own code, it's easier to add Serializable than to implement these wrappers.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.