7

I have Java String array which contains 45 string which is basically column names

String[] fieldNames = {"colname1","colname2",...}; 

Currently I am storing above array of String in a Spark driver in a static field. My job is running slow so trying to refactor code. I am using above String array while creating a DataFrame

DataFrame dfWithColNames = sourceFrame.toDF(fieldNames); 

I want to do the above using broadcast variable to that it don't ship huge string array to every executor. I believe we can do something like the following to create broadcast

String[] brArray = sc.broadcast(fieldNames,String[].class);//gives compilation error 

DataFrame df = sourceFrame.toDF(???);//how do I use above broadcast can I use it as is by passing brArray 

I am new to Spark.

3 Answers 3

23

This is a bit old question, however, I hope my solution would help somebody.

In order to broadcast any object (could be a single POJO or a collection) with Spark 2+ you first need to have the following method that creates a classTag for you:

private static <T> ClassTag<T> classTag(Class<T> clazz) {
   return scala.reflect.ClassManifestFactory.fromClass(clazz);
}

next you use a JavaSparkContext from a SparkSession to broadcast your object as previously:

   sparkSession.sparkContext().broadcast(
            yourObject,
            classTag(YourObject.class)
    )

In case of a collection, say, java.util.List, you use the following:

    sparkSession.sparkContext().broadcast(
            yourObject,
            classTag(List.class)
    )
Sign up to request clarification or add additional context in comments.

Comments

13

The return variable of sc.broadcast is of type Broadcast<String[]> and not String[]. When you want to access the value, you simply call value() on the variable. From your example it would be like:

Broadcast<String[]> broadcastedFieldNames = sc.broadcast(fieldNames)
DataFrame df = sourceFrame.toDF(broadcastedFieldNames.value())

Note, that if you are writing this in Java, you probably want to wrap the SparkContext within the JavaSparkContext. It makes everything easier and you can then avoid having to pass a ClassTag to the broadcast function.

You can read more on broadcasting variables on http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

1 Comment

JavaSparkContext explanation is good, so that i dont have to include classtag.
1
    ArrayList<String> dataToBroadcast = new ArrayList();
    dataToBroadcast .add("string1");
    ...
    dataToBroadcast .add("stringn");
                        
  //Creating the broadcast variable
  //No need to write classTag code by hand use akka.japi.Util which is available
                    
  Broadcast<ArrayList<String>> strngBrdCast = spark.sparkContext().broadcast(
                                      dataToBroadcast,
                                      akka.japi.Util.classTag(ArrayList.class));
                        
    //Here is the catch.When you are iterating over a Dataset, 
    //Spark will actally run it in distributed mode. So if you try to accees
    //Your object directly (e.g. dataToBroadcast) it would be null . 
    //Cause you didn't ask spark to explicitly send tha outside variable to each
    //machine where you are running this for each parallelly.
    //So you need to use Broadcast variable.(Most common use of Broadcast)  
    
    someSparkDataSetWhere.foreach((row) -> {
     ArrayList<String> stringlist = strngBrdCast.value();
     ...
     ...
    })

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.