0

I have Spark 1.3 set up in VirtualBox Ubuntu 14 32 bit VM. I have taken csv file into Spark DataFrame and am attempting some operations that are giving me error messages I can't troubleshoot.

The pySpark code is below

from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from datetime import *
from dateutil.parser import parse
sqlContext = SQLContext(sc)
elevFile = sc.textFile('file:////sharefolder/Jones Lake.csv')
header = elevFile.first()
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields[0].dataType = IntegerType()
fields[1].dataType = TimestampType()
fields[2].dataType = FloatType()
schema = StructType(fields)
elevHeader = elevFile.filter(lambda l: "Hour" in l)
elevHeader.collect()
elevNoHeader = elevFile.subtract(elevHeader)
print elevNoHeader.take(5)
elev_df = (elevNoHeader.map(lambda k: k.split(","))
            .map(lambda p: (int(p[0]), parse(p[1]), float(p[2])))
            .toDF(schema))

Everything works fine up to this point. It prints out top 5 rows of new DataFrame no problem:

print elev_df.head(5)
[Row(Hour=6, Date=datetime.datetime(1989, 9, 19, 0, 0), Value=641.6890258789062), Row(Hour=20, Date=datetime.datetime(1992, 4, 30, 0, 0), Value=633.7100219726562), Row(Hour=10, Date=datetime.datetime(1987, 7, 26, 0, 0), Value=638.6920166015625), Row(Hour=1, Date=datetime.datetime(1991, 2, 26, 0, 0), Value=634.2100219726562), Row(Hour=2, Date=datetime.datetime(1984, 7, 28, 0, 0), Value=639.8779907226562)]

But when I try to do simple group by and count, I am getting errors I can't troubleshoot.

elev_df.groupBy("Hour").count().show()

Gives error (top few lines of error are below).

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-209-6533c596fac9> in <module>()
----> 1 elev_df.groupBy("Hour").count().show()

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n)
    271         5   Bob
    272         """
--> 273         print self._jdf.showString(n).encode('utf8', 'ignore')
    274 
    275     def __repr__(self):

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/usr/local/bin/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Any ideas on troubleshooting this further?

1 Answer 1

1

It seems like ur CSV has some blank value. I can see your are replacing the blank values but groupby is not accepting that I believe. Handle ur csv blank values using spark dataframe as easy way-

fillna(value, subset=None)
Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

Parameters: 
value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.
subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Sign up to request clarification or add additional context in comments.

1 Comment

Yes it did have unexpected blank rows so filtered them out and it worked fine. I can do groupBy and SQL on the data frame. Thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.