7

I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.

When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?

Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
                StructField('subscriberId', StringType(), True),
                StructField('sourceIPv4Address', StringType(), True),
                StructField('destinationIPv4Address', StringType(), True),
                StructField('service',StringType(), True),
                StructField('baseService',StringType(), True),
                StructField('serverHostname', StringType(), True),
                StructField('rat', StringType(), True),
                StructField('userAgent', StringType(), True),
                StructField('accessPoint', StringType(), True),
                StructField('station', StringType(), True),
                StructField('device', StringType(), True),
                StructField('contentCategories', StringType(), True),
                StructField('incomingOctets', LongType(), True),
                StructField('outgoingOctets', LongType(), True),
                StructField('incomingShapingDrops', IntegerType(), True),
                StructField('outgoingShapingDrops', IntegerType(), True),
                StructField('qoeIncomingInternal', DoubleType(), True),
                StructField('qoeIncomingExternal', DoubleType(), True),
                StructField('qoeOutgoingInternal', DoubleType(), True),
                StructField('qoeOutgoingExternal', DoubleType(), True),
                StructField('incomingShapingLatency', DoubleType(), True),
                StructField('outgoingShapingLatency', DoubleType(), True),
                StructField('internalRtt', DoubleType(), True),
                StructField('externalRtt', DoubleType(), True),
                StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
4
  • When you run spark-submit, you delegate the execution to your cluster (Spark? YARN?) on which you may be misssing dependencies. See also this discussion See stackoverflow.com/questions/36910014/… Commented May 25, 2018 at 14:08
  • 2
    Looks like a duplicate of Spark can access Hive table from pyspark but not from spark-submit Commented May 26, 2018 at 21:59
  • Try to insert time.sleep(600) at the end of code, go to Spark UI and check out logs. Commented May 29, 2018 at 22:44
  • @user8371915 Yes that was the same issue! Although in my case it wasn't throwing any errors so it was hard to understand what was the root cause. Thanks! Commented May 30, 2018 at 17:29

1 Answer 1

3

As @user8371915 suggested it is similar to this:

Spark can access Hive table from pyspark but not from spark-submit

I needed to replace

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

with

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

This resolved this issue.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.