1

Not a question->need a suggestion

I am operating on 20gb+6gb=26Gb csv file with 1+3 (1-master, 3-slave (each of 16 gb RAM).

This is how I am doing my ops

df = spark.read.csv() #20gb
df1 = spark.read.csv() #6gb
df_merged= df.join(df1,'name','left') ###merging 
df_merged.persists(StorageLevel.MEMORY_AND_DISK) ##if i do MEMORY_ONLY will I gain more performance?????
print('No. of records found: ',df_merged.count())  ##just ensure persist by calling an action
df_merged.registerTempTable('table_satya')
query_list= [query1,query2,query3]  ###sql query string to be fired
city_list = [city1, city2,city3...total 8 cities]
file_index=0 ###will create files based on increasing index
for query_str in query_list:
   result = spark.sql(query_str) #ex: select * from table_satya where date >= '2016-01-01'
   #result.persist()  ###willit increase performance
   for city in city_list:
        df_city = result.where(result.city_name==city)
        #store as csv file(pandas style single file)
        df_city.collect().toPandas().to_csv('file_'+str(file_index)+'.csv',index=False)
        file_index += 1

df_merged.unpersist()  ###do I even need to do it or Spark can handle it internally

Currently it is taking a huge time.

#persist(On count())-34 mins.
#each result(on firing each sql query)-around (2*8=16min toPandas() Op)
#          #for each toPandas().to_csv() - around 2 min each
#for 3 query 16*3= 48min
#total 34+48 = 82 min  ###Need optimization seriously

So can anybody suggest how can i optimize the above process for a better performance(Time and Memory both.)

Why I am worried is : I was doing the above on Python-Pandas platform (64Gb single machine with serialized pickle data) and I was able to do that in 8- 12mins. As my data-volume seems growing, so need to adopt a technology like spark.

Thanks in Advance. :)

5
  • 1
    Anytime you serialize to disk you incur huge I/O penalties. The power of Spark is to only use memory to keep things fast. You need to persist to memory, and make sure you have enough memory to handle the data. You also want to make sure you've configured your env correctly...Transparent Huge Pages should be off and swappiness set to 0 or 1. Commented Oct 17, 2016 at 15:24
  • Does your raw data have only the eight cities you are looking for or does it have many more? Commented Oct 17, 2016 at 15:26
  • 1
    Is this correct? result.where('city_name'==city). That would seem to be asking for result.where(False). Did you mean something like result.where("city_name='%s'" % city) ? Commented Oct 17, 2016 at 15:31
  • @StevenRumbalski-No, I have many more cities(90 appx.). And yes result.where(result.city_name==city). I have corrected it. Commented Oct 17, 2016 at 15:44
  • @tadamhicks- How can I set "Transparent Huge Pages should be off and swappiness set to 0 or 1". what are the variables to do so. Plz, help. And by your comment do you mean , if I do df_merged.persists(StorageLevel.MEMORY_ONLY) , will I get performance gain??? Commented Oct 17, 2016 at 15:50

1 Answer 1

1

I think your best bet is cutting the source data down to size. You mention that your source data has 90 cities, but you are only interested in 8 of them. Filter out the cities you don't want and keep the ones you do want in separate csv files:

import itertools
import csv

city_list = [city1, city2,city3...total 8 cities]

with open('f1.csv', 'rb') as f1, open('f2.csv', 'rb') as f2:
    r1, r2 = csv.reader(f1), csv.reader(f2)
    header = next(r1)
    next(r2) # discard headers in second file
    city_col = header.index('city_name')
    city_files = []
    city_writers = {}
    try:
    for city in city_list:
            f = open(city+'.csv', 'wb')
            city_files.append(f)
            writer = csv.writer(f)
            writer.writerow(header)
            city_writers[city] = writer
        for row in itertools.chain(r1, r2):
            city_name = row[city_col]
            if city_name in city_writers:
                city_writers[city_name].writerow(row)
    finally:
        for f in city_files:
            f.close()

After this iterate over each city, creating a DataFrame for the city, then in a nested loop run your three queries. Each DataFrame should have no problem fitting in memory and the queries should run quickly since they are running over a much smaller data set.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.