1

I am currently reading all of the text files in a public AWS bucket that contains hundreds of CSV files. I read all of the CSV files at once and then turn them into an RDD and start massaging the data so that it can be stored in Cassandra. Processing all of the textfiles is taking over two hours and a half and this is too long for just 100GB of data. Is there anything I can do to my code below to make it faster?

I appreciate any suggestions. I've also tried reading this https://robertovitillo.com/2015/06/30/spark-best-practices/ but I'm confused with how to implement some of the things mentioned like "Using the right level of parallelism." I also tried storing my RDD in cache by doing rdd.cache, but that still took over two hours.

conf = SparkConf() \
   .setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")

def rddCleaning(rd,timeframe):

 def check_valid(tup):
    try:
        int(tup[1])
        int(tup[4])
        float(tup[5])
        float(tup[6])
        return True
    except ValueError:
        return False


 def fillin(tup):
    if tup[2] == "" and tup[3] != "":
        return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
    else:
        return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))

 def popular_avg(curr_tup):
    lst = curr_tup[1]
    dictionary = curr_tup[2]
    dict_matches = {}
    for tup in lst:
    event_type = tup[0]
        dict_matches[event_type] = dictionary[event_type]
    return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

 def merge_info(tup):
    main_dict = tup[1]
    info_dict = tup[2]
    for key in info_dict:
        main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
    return ((tup[0],main_dict))

 def event_todict(tup):
    lst = tup[1]
    dict_matches = {}
    for event_tup in lst:
        dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
    return ((tup[0],dict_matches,tup[2],tup[3]))

 def sum_allevents(tup):
    type_lst = tup[1]
    total_mentions = 0
    for event in type_lst:
            total_mentions += event[1]
    return ((tup[0],type_lst,tup[2],total_mentions))

actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34

if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3




rdd_reduce  = rd.map(lambda x: x.split('\t')) \
        .map(lambda y: ((y[actionGeo_CountryCode],
                                 y[time],
                                 y[actor1Type1Code],
                                 y[actor2Type1Code],
                                 y[numArticles],
                                 y[goldsteinScale],
                                 y[avgTone]))) \
        .filter(check_valid) \
        .map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
        .map(fillin) \
                .filter(lambda r: r[0] in tofullname and r[2] in toevent and  r[2] != "" and r[0] != "") \
                .map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
                .map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
        .map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))


rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
                                      ([(t[0][2],t[1][0])],
                                      [(t[0][2],{"GoldsteinScaleAvg":t[1][1],
                                                "ToneAvg":t[1][2]})]))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
           .map(lambda v: (v[0],
                                       sorted(v[1][0],key=itemgetter(1),reverse=True),
                                       v[1][1])) \
               .map(sum_allevents) \
                       .map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
                       .map(popular_avg) \
               .map(event_todict) \
                       .map(merge_info) \
           .map(lambda d: ((d[0][0],d[0][1],d[1])))

return rdd_format




daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));

Here is a picture of my pyspark running: enter image description here

Edits made after suggestions: I made the following changes to my code and it improved the performance, but it is still taking a long time. Is this happening because every time I call df it is reading all of the files from my S3 bucket all over again? Should I put some of my df and temporary tables in cache? Here is my code:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra

sc = SparkContext()
sqlContext = SQLContext(sc)

customSchema = schema = StructType([
        StructField('GLOBALEVENTID',StringType(),True),
        StructField('SQLDATE',StringType(),True),
        StructField('MonthYear',StringType(),True),
        StructField('Year',StringType(),True),
        StructField('FractionDate',StringType(),True),
        StructField('Actor1Code',StringType(),True),
        StructField('Actor1Name',StringType(),True),
        StructField('Actor1CountryCode',StringType(),True),
        StructField('Actor1KnownGroupCode',StringType(),True),
        StructField('Actor1EthnicCode',StringType(),True),
        StructField('Actor1Religion1Code',StringType(),True),
        StructField('Actor1Religion2Code',StringType(),True),
        StructField('Actor1Type1Code',StringType(),True),
        StructField('Actor1Type2Code',StringType(),True),
        StructField('Actor1Type3Code',StringType(),True),
        StructField('Actor2Code',StringType(),True),
        StructField('Actor2Name',StringType(),True),
        StructField('Actor2CountryCode',StringType(),True),
        StructField('Actor2KnownGroupCode',StringType(),True),
        StructField('Actor2EthnicCode',StringType(),True),
        StructField('Actor2Religion1Code',StringType(),True),
        StructField('Actor2Religion2Code',StringType(),True),
        StructField('Actor2Type1Code',StringType(),True),
        StructField('Actor2Type2Code',StringType(),True),
        StructField('Actor2Type3Code',StringType(),True),
        StructField('IsRootEvent',StringType(),True),
        StructField('EventCode',StringType(),True),
        StructField('EventBaseCode',StringType(),True),
        StructField('EventRootCode',StringType(),True),
        StructField('QuadClass',StringType(),True),
        StructField('GoldsteinScale',StringType(),True),
        StructField('NumMentions',StringType(),True),
        StructField('NumSources',StringType(),True),
        StructField('NumArticles',StringType(),True),
        StructField('AvgTone',StringType(),True),
        StructField('Actor1Geo_Type',StringType(),True),
        StructField('Actor1Geo_FullName',StringType(),True),
        StructField('Actor1Geo_CountryCode',StringType(),True),
        StructField('Actor1Geo_ADM1Code',StringType(),True),
        StructField('Actor1Geo_Lat',StringType(),True),
        StructField('Actor1Geo_Long',StringType(),True),
        StructField('Actor1Geo_FeatureID',StringType(),True),
        StructField('Actor2Geo_Type',StringType(),True),
        StructField('Actor2Geo_FullName',StringType(),True),
        StructField('Actor2Geo_CountryCode',StringType(),True),
        StructField('Actor2Geo_ADM1Code',StringType(),True),
        StructField('Actor2Geo_Lat',StringType(),True),
        StructField('Actor2Geo_Long',StringType(),True),
        StructField('Actor2Geo_FeatureID',StringType(),True),
        StructField('ActionGeo_Type',StringType(),True),
        StructField('ActionGeo_FullName',StringType(),True),
        StructField('ActionGeo_CountryCode',StringType(),True),
        StructField('ActionGeo_ADM1Code',StringType(),True),
        StructField('ActionGeo_Lat',StringType(),True),
        StructField('ActionGeo_Long',StringType(),True),
        StructField('ActionGeo_FeatureID',StringType(),True),
        StructField('DATEADDED',StringType(),True),
        StructField('SOURCEURL',StringType(),True)])

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false') \
    .options(delimiter="\t") \
    .load('s3a://gdelt-open-data/events/*', schema = customSchema)

def modify_values(r,y):
   if r == '' and y != '':
    return y
   else:
        return r

def country_exists(r):
   if r in tofullname:
        return tofullname[r]
   else:
    return ''

def event_exists(r):
   if r in toevent:
    return toevent[r]
   else:
    return ''


modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
           .withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
           .withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))

sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               SQLDATE, MonthYear, Year,
                               Actor1Type1Code,
                               NumArticles,
                               GoldsteinScale,
                               AvgTone
                          FROM temp
                         WHERE ActionGeo_CountryCode <> ''
                            AND Actor1Type1Code <> ''
                            AND NumArticles <> ''
                            AND GoldsteinScale <> ''
                            AND AvgTone <> ''""")

sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
                               Actor1Type1Code,
                               CAST(NumArticles AS INTEGER),
                               CAST(GoldsteinScale AS INTEGER),
                               CAST(AvgTone AS INTEGER)
                          FROM temp2""")

sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')

dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                   SQLDATE,
                                   Actor1Type1Code,
                                   SUM(NumArticles) AS NumArticles,
                                   ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                   ROUND(AVG(AvgTone),0) AS AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                      SQLDATE,
                                      Actor1Type1Code""")

dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                     MonthYear,
                                     Actor1Type1Code,
                                     SUM(NumArticles) AS NumArticles,
                                     ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                     ROUND(AVG(AvgTone),0) as AvgTone
                                FROM temp3
                                   GROUP BY ActionGeo_CountryCode,
                                    MonthYear,
                                    Actor1Type1Code""")

dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                    Year,
                                    Actor1Type1Code,
                                    SUM(NumArticles) AS NumArticles,
                                    ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                    ROUND(AVG(AvgTone),0) as AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                       Year,
                                       Actor1Type1Code""")

def rddCleaning(rd,timeframe):

    def popular_avg(curr_tup):
        lst = curr_tup[1]
        dictionary = curr_tup[2]
        dict_matches = {}
        for tup in lst:
        event_type = tup[0]
            dict_matches[event_type] = dictionary[event_type]
        return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

    def merge_info(tup):
        main_dict = tup[1]
        info_dict = tup[2]
        for key in info_dict:
            main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
        return ((tup[0],main_dict))

    def event_todict(tup):
        lst = tup[1]
        dict_matches = {}
        for event_tup in lst:
            dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
        return ((tup[0],dict_matches,tup[2],tup[3]))

    def sum_allevents(tup):
        type_lst = tup[1]
        total_mentions = 0
        for event in type_lst:
                total_mentions += event[1]
        return ((tup[0],type_lst,tup[2],total_mentions))

    rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
                                   ([(y["Actor1Type1Code"],y["NumArticles"])],
                    [(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
                   ))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
               .map(lambda v: (v[0],
                                   sorted(v[1][0],key=itemgetter(1),reverse=True),
                                   dict(v[1][1]))) \
           .map(sum_allevents) \
                   .map(popular_avg) \
           .map(event_todict) \
                   .map(merge_info) \
               .map(lambda d: ((d[0][0],d[0][1],d[1])))

    return rdd_format

print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))
6
  • For starters you loose all the udfs. None of these is necessary. Commented Feb 20, 2017 at 0:34
  • Thank you for responding! I really appreciate it. I used ufds because I was not sure how to use a python dictionary in SQL. In two of the udfs I am renaming rows based off a dictionary stored in another python file. For example, if a cell in a column has "US" then I rename it to "United States" in the udf. Can i have a dictionary variable in SQL? Commented Feb 20, 2017 at 1:11
  • You can use join with broadcast or literal map. Commented Feb 20, 2017 at 15:25
  • @zero323 I will make the changes you suggested. Is there anything else that you think is causing my spark job to take a long time to run? Are there additional configurations I could make to make my job run faster? I am using a master node and three slave nodes and processing 100GB of data is taking over three hours. Commented Feb 20, 2017 at 20:13
  • 1
    pyspark.sql.functions.create_map Commented Feb 21, 2017 at 13:09

1 Answer 1

2

The most immediate thing I can think of is to use dataframes instead of RDD. Basically RDD in python are considerably slower than in scala because of conversions between python and JVM. Also dataframes enjoy many optimizations.

It is very difficult to follow all the code here to try to suggest a conversion, however, as a basis you can use spark.read.csv to read from the csv directly to dataframe (and set a schema so that a lot of the validation would occur automatically) and the many existing functions should make it easy to write.

Sign up to request clarification or add additional context in comments.

5 Comments

If I read the csv as a dataframe then I can no longer do .map and .reduce functions right? I need the .map and .reduce to make several transformations to my data. I'm using a master and three slaves and I want to make use of all of my resources. If I use RDD then it parallelizes the tasks to the other nodes.
@CatherineAlv well let's say you go back and forth between RDDs and DataFrames if you have some specifics
A quick glance on your code seems to suggest that everything you do in .map and .reduce can be done with dataframes (by using dataframes functions). Spark dataframes (which are part of the spark sql package in pyspark.sql) are distributed just like RDD (and much more optimized). If something is missing you can generally solve it with a UDF (although python UDF are slower than scala's and definitly slower than the available functions in pyspark.sql.functions). Try looking at the programming guide for spark sql for more information
@AssafMendelson do you have additional suggestions on the updates I've made? Are there additional optimizations I could make?
take a look at the built in functions. Using builtin functions is much faster than UDF. I see for example you are defining the modify_val UDF. Instead you might use when, otherwise. see pyspark.sql.functions.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.