6

I am running a spark streaming (1.6.1) on yarn using DirectAPI to read events from Kafka topic having 50 partitions and writing on HDFS. I have a batch interval of 60 seconds. I was receiving around 500K messages which was getting processed under 60 Sec.

Suddenly spark started receiving 15-20 million messages which took around 5-6 minutes to process with a batch interval of 60 seconds. I have configured "spark.streaming.concurrentJobs=4".

So when batch takes a long time for processing spark initiate concurrent 4 active tasks to handle the backlog batches but still over a period of time batch backlog increases as batch interval is too less for such volume of data.

I have few doubts around this.

  1. When I start receiving 15-20 million messages & time to process those messages is around 5-6 minutes with batch interval of 60 Sec. When I check my HDFS directory I see the files created for each 60 Sec with 50 part files, I am little confused here my batch is getting processed in 5-6 minutes, then how it is writing files on HDFS every 1 min & 'saveAsTextFile' action is called only once per batch. Total records from all the files 50 part files comes around 3.3 million.

  2. In order to handle the processing of 15-20 million messages, I configured my batch interval to 8-10 minutes now spark started consuming around 35-40 million messages from Kafka & again its processing time started exceeding batch interval.

I have configured 'spark.streaming.kafka.maxRatePerPartition=50' & 'spark.streaming.backpressure.enabled=true'.

9
  • That rate of 15-20M rec/sec is a momentary peak or constant? If the mean rate increases 40x, I would expect a likewise increase of the cluster infrastructure as well. Commented Dec 16, 2016 at 14:06
  • @maasg rate is constant now, My cluster config is 3 node (1 node is shared by kafka and spark client) each with 128 GB ram, 10 cores I tried all possible ways but not able to process this volume of data under 1 min, I need to process this within 1 min batch interval. Commented Dec 16, 2016 at 14:13
  • I'd use backpressure to determine the current max throughput and then increase infrastructure accordingly. I think there's no config magic that can give you a 40x capacity increase unless previous resources were heavily over-dimensioned. From your description, It doesn't seem the case. Commented Dec 16, 2016 at 14:25
  • @maasg I have enabled back pressure "spark.streaming.backpressure.enabled=true" and "spark.streaming.kafka.maxRatePerPartition=50". I did not see back pressure algorithm triggered. I am receiving 550 json messages in one minute batch each json has (24 * 50K) sub json records which I process using gson parser inside mapPartition method. May you please help me with how to determine current max throughput. Commented Dec 16, 2016 at 14:41
  • You can do that manually by changing spark.streaming.kafka.maxRatePerPartition until you find a rate that's sustainable by your system. Commented Dec 16, 2016 at 16:33

1 Answer 1

0

I think one thing that may have confused you is the relationship between the length of a job, and the frequency.

From what you describe, with the resources available it seems that in the end the job took about 5 minutes to complete. However your batch frequency is 1 minute.

So as a result, every 1 minute you kick off some batch that takes 5 minutes to complete.

As a result, in the end you will expect to see HDFS receive nothing for the first few minutes, and then you keep receiving something every 1 minute (but with a 5 minute 'delay' from when the data went in).

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.