I'm new to spark and dataframes and I'm looking for feedback on what bad or inefficient processes might be in my code so I can improve and learn. My program reads in a parquet file that contains server log data about requests made to our website. I think it is pretty self-explanatory, the only parts that might not be is that we add some etl fields for tracking, and we cast the accessing device to one of a set of choices to make reporting easier (accomplished through the switch sql expression).
Here is my code:
from functools import reduce
from time import time
from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, current_date, date_format, lit, when
EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()
if __name__ == '__main__':
start = time()
df = sql_context.read.parquet(
r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
r'usage_data.gz.parquet')
course_data = df.filter(df['context_type'] == 'Course')
request_data = df.select(
df['user_id'],
df['context_id'].alias('course_id'),
date_format(df['request_timestamp'], 'MM').alias('request_month'),
df['user_agent']
)
sesh_id_data = df.groupBy('user_id').count()
sesh_id_data = sesh_id_data.withColumnRenamed('count', 'num_sessions')
joined_data = request_data.join(
sesh_id_data,
on=request_data['user_id'] == sesh_id_data['user_id']
).drop(sesh_id_data['user_id'])
etl_fields = [c.alias(a) for (a, c) in [
('etl_process_status', lit('DEV')),
('etl_datetime_local', current_date()),
('etl_transformation_name', lit('agg_canvas_logs_user_agent_types')),
('etl_pdi_version', lit(r'Apache Spark')),
('etl_pdi_build_version', lit(r'1.6.1')),
('etl_pdi_hostname', lit(r'N/A')),
('etl_pdi_ipaddress', lit(r'N/A')),
('etl_checksum_md5', lit(r'N/A'))
]]
case_stmt = r'''
CASE
WHEN user_agent LIKE '%CanvasAPI%' THEN 'api'
WHEN user_agent LIKE '%candroid%' THEN 'mobile_app_android'
WHEN user_agent LIKE '%iCanvas%' THEN 'mobile_app_ios'
WHEN user_agent LIKE '%CanvasKit%' THEN 'mobile_app_ios'
WHEN user_agent LIKE '%Windows NT%' THEN 'desktop'
WHEN user_agent LIKE '%MacBook%' THEN 'desktop'
WHEN user_agent LIKE '%iPhone%' THEN 'mobile'
WHEN user_agent LIKE '%iPod Touch%' THEN 'mobile'
WHEN user_agent LIKE '%iPad%' THEN 'mobile'
WHEN user_agent LIKE '%iOS%' THEN 'mobile'
WHEN user_agent LIKE '%CrOS%' THEN 'desktop'
WHEN user_agent LIKE '%Android%' THEN 'mobile'
WHEN user_agent LIKE '%Linux%' THEN 'desktop'
WHEN user_agent LIKE '%Mac OS%' THEN 'desktop'
WHEN user_agent LIKE '%Macintosh%' THEN 'desktop'
ELSE 'other_unknown'
END AS user_agent_type
'''
all_fields = joined_data.select('*', *etl_fields)
final_data = all_fields.selectExpr(
'*', case_stmt
).drop(all_fields['user_agent'])
# TODO Study the below code for understanding
# c = col("user_agent")
# vs = [('Android', 'mobile'), ('Linux', 'desktop')]
# expr = reduce(
# lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc),
# vs,
# 'other_unknown'
# ).alias('user_agent_type')
final_data = final_data.select(
'user_id',
'course_id',
'request_month',
'user_agent_type',
'num_sessions',
'etl_process_status',
'etl_datetime_local',
'etl_transformation_name',
'etl_pdi_version',
'etl_pdi_build_version',
'etl_pdi_hostname',
'etl_pdi_ipaddress',
'etl_checksum_md5'
)
final_data.write.parquet('outfile')
end = time()
print('Total run time was {time}'.format(time=end - start))