4
\$\begingroup\$

I'm new to spark and dataframes and I'm looking for feedback on what bad or inefficient processes might be in my code so I can improve and learn. My program reads in a parquet file that contains server log data about requests made to our website. I think it is pretty self-explanatory, the only parts that might not be is that we add some etl fields for tracking, and we cast the accessing device to one of a set of choices to make reporting easier (accomplished through the switch sql expression).

Here is my code:

from functools import reduce
from time import time

from boto3 import client
import psycopg2 as ppg2
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, current_date, date_format, lit, when

EMR_CLIENT = client('emr')
conf = SparkConf().setAppName('Canvas Requests Logs')
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
# for dependencies
# sc.addPyFile()


if __name__ == '__main__':
    start = time()
    df = sql_context.read.parquet(
        r'/Users/mharris/PycharmProjects/etl3/pyspark/Datasets/'
        r'usage_data.gz.parquet')

    course_data = df.filter(df['context_type'] == 'Course')
    request_data = df.select(
        df['user_id'],
        df['context_id'].alias('course_id'),
        date_format(df['request_timestamp'], 'MM').alias('request_month'),
        df['user_agent']
    )

    sesh_id_data = df.groupBy('user_id').count()
    sesh_id_data = sesh_id_data.withColumnRenamed('count', 'num_sessions')

    joined_data = request_data.join(
        sesh_id_data,
        on=request_data['user_id'] == sesh_id_data['user_id']
    ).drop(sesh_id_data['user_id'])

    etl_fields = [c.alias(a) for (a, c) in [
        ('etl_process_status', lit('DEV')),
        ('etl_datetime_local', current_date()),
        ('etl_transformation_name', lit('agg_canvas_logs_user_agent_types')),
        ('etl_pdi_version', lit(r'Apache Spark')),
        ('etl_pdi_build_version', lit(r'1.6.1')),
        ('etl_pdi_hostname', lit(r'N/A')),
        ('etl_pdi_ipaddress', lit(r'N/A')),
        ('etl_checksum_md5', lit(r'N/A'))
    ]]

    case_stmt = r'''
    CASE
        WHEN user_agent LIKE '%CanvasAPI%' THEN 'api'
        WHEN user_agent LIKE '%candroid%' THEN 'mobile_app_android'
        WHEN user_agent LIKE '%iCanvas%' THEN 'mobile_app_ios'
        WHEN user_agent LIKE '%CanvasKit%' THEN 'mobile_app_ios'
        WHEN user_agent LIKE '%Windows NT%' THEN 'desktop'
        WHEN user_agent LIKE '%MacBook%' THEN 'desktop'
        WHEN user_agent LIKE '%iPhone%' THEN 'mobile'
        WHEN user_agent LIKE '%iPod Touch%' THEN 'mobile'
        WHEN user_agent LIKE '%iPad%' THEN 'mobile'
        WHEN user_agent LIKE '%iOS%' THEN 'mobile'
        WHEN user_agent LIKE '%CrOS%' THEN 'desktop'
        WHEN user_agent LIKE '%Android%' THEN 'mobile'
        WHEN user_agent LIKE '%Linux%' THEN 'desktop'
        WHEN user_agent LIKE '%Mac OS%' THEN 'desktop'
        WHEN user_agent LIKE '%Macintosh%' THEN 'desktop'
        ELSE 'other_unknown'
    END AS user_agent_type
    '''
    all_fields = joined_data.select('*', *etl_fields)
    final_data = all_fields.selectExpr(
        '*', case_stmt
    ).drop(all_fields['user_agent'])
    # TODO Study the below code for understanding
    # c = col("user_agent")
    # vs = [('Android', 'mobile'), ('Linux', 'desktop')]
    # expr = reduce(
    #     lambda acc, kv: when(c.like(kv[0]), kv[1]).otherwise(acc),
    #     vs,
    #     'other_unknown'
    # ).alias('user_agent_type')
    final_data = final_data.select(
        'user_id',
        'course_id',
        'request_month',
        'user_agent_type',
        'num_sessions',
        'etl_process_status',
        'etl_datetime_local',
        'etl_transformation_name',
        'etl_pdi_version',
        'etl_pdi_build_version',
        'etl_pdi_hostname',
        'etl_pdi_ipaddress',
        'etl_checksum_md5'
    )
    final_data.write.parquet('outfile')
    end = time()
    print('Total run time was {time}'.format(time=end - start))
\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.