4

Suppose I have an input file of size 100MB. It contains large number of points (lat-long pair) in CSV format. What should I do in order to split the input file in 10 10MB files in Apache Spark or how do I customize the split.

Note: I want to process a subset of the points in each mapper.

2 Answers 2

5

Spark's abstraction doesn't provide explicit split of data. However you can control the parallelism in several ways.

Assuming you use YARN, HDFS file is automatically split into HDFS blocks and they're processed concurrently when Spark action is running.

Apart from HDFS parallelism, consider using partitioner with PairRDD. PairRDD is data type of RDD of key-value pairs and a partitioner manages mapping from a key to a partition. Default partitioner reads spark.default.parallelism. The partitioner helps to control the distribution of data as well as its locality in PairRDD-specific actions, e.g., reduceByKey.

Take a look at following documentation about Spark data parallelism.

http://spark.apache.org/docs/1.2.0/tuning.html

Sign up to request clarification or add additional context in comments.

3 Comments

Scenario: I have 50 points and a target point p0. I have to find the closest point to p0 among these 50 points. So I decided to break the 50 points into 5 10 points and run closest neighbour algorithm on each 10 points in parallel. I know from my little MapReduce knowledge that each mapper takes a single line without customization. Therefore each mapper gets a single point to operate on, not 10 points. How to solve this problem?
Suztomo - There is also the minPartitions parameter on textFile() that gives some control over how many partitions to load a file into. @Chandan - If your RDD doesn't have enough partitions, try explicitly repartitioning it with RDD.repartition(N) before running your computation. More, smaller partitions will give each task (I don't think we talk about "mappers" in Spark) less work to do.
JavaRDD<String> lines = ctx.textFile("/home/hduser/Spark_programs/file.txt").cache(); lines.repartition(2); List<Partition> partitions = lines.partitions(); System.out.println(partitions.size()); It is giving the only one partition.
1

After searching through the Spark API I have found one method partition which returns the number of partitions of the JavaRDD. At the time of JavaRDD creation we have repartitioned it to desired number of partitions as told by @Nick Chammas.

JavaRDD<String> lines = ctx.textFile("/home/hduser/Spark_programs/file.txt").repartition(5);
List<Partition> partitions = lines.partitions();
System.out.println(partitions.size());

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.