20

I am reading a file in PySpark and forming the rdd of it. I then convert it to a normal dataframe and then to pandas dataframe. The issue that I am having is that there is header row in my input file and I want to make this as the header of dataframe columns as well but they are read in as an additional row and not as header. This is my current code:

def extract(line):
    return line


input_file = sc.textFile('file1.txt').zipWithIndex().filter(lambda (line,rownum): rownum>=0).map(lambda (line, rownum): line)

input_data = (input_file
    .map(lambda line: line.split(";"))
    .filter(lambda line: len(line) >=0 )
    .map(extract)) # Map to tuples

df_normal = input_data.toDF()
df= df_normal.toPandas()

Now when I look at the df then the header row of text file becomes the first row of dataframe and there is additional header in df with 0,1,2... as header. How can I make the first row as header?

2
  • without a sample of the dataframe to work with, I would think you could just use df_normal.toPandas('header'=1) . Or whatever row contains the header you want Commented Jan 16, 2016 at 22:08
  • 1
    Why use Spark at all here? If you assume that data fits (ignoring empty lines) on a local machine it is just a waste of time and resources. Commented Jan 17, 2016 at 8:39

3 Answers 3

27

There are a couple of ways to do that, depending on the exact structure of your data. Since you do not give any details, I'll try to show it using a datafile nyctaxicab.csv that you can download.

If your file is in csv format, you should use the relevant spark-csv package, provided by Databricks. No need to download it explicitly, just run pyspark as follows:

$ pyspark --packages com.databricks:spark-csv_2.10:1.3.0

and then

>>> from pyspark.sql import SQLContext
>>> from pyspark.sql.types import *
>>> sqlContext = SQLContext(sc)

>>> df = sqlContext.read.load('file:///home/vagrant/data/nyctaxisub.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

>>> df.count()
249999

The file has 250,000 rows including the header, so 249,999 is the correct number of actual records. Here is the schema, as inferred automatically by the package:

>>> df.dtypes
[('_id', 'string'),
 ('_rev', 'string'),
 ('dropoff_datetime', 'string'),
 ('dropoff_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'string'),
 ('pickup_latitude', 'double'),
 ('pickup_longitude', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'double'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

You can see more details in my relevant blog post.

If, for whatever reason, you cannot use the spark-csv package, you'll have to subtract the first row from the data and then use it to construct your schema. Here is the general idea, and you can again find a full example with code details in another blog post of mine:

>>> taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
>>> taxiFile.count()
250000
>>> taxiFile.take(5)
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"',
 u'"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"',
 u'"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+001,-7.39942860000000E+001,"2D73B0C44F1699C67AB8AE322433BDB7","6F907BC9A85B7034C8418A24A0A75489",5,"2013-01-11 21:46:00",+4.07577480000000E+001,-7.39649810000000E+001,1,,+3.01000000000000E+000,960,"VTS"',
 u'"2a80cfaa425dcec0861e02ae446226e4","1-aa8b16d6ae44ad906a46cc6581ffea50","2013-01-11 10:03:00",+4.07643050000000E+001,-7.39544600000000E+001,"E90018250F0A009433F03BD1E4A4CE53","1AFFD48CC07161DA651625B562FE4D06",5,"2013-01-11 09:44:00",+4.07308080000000E+001,-7.39928280000000E+001,1,,+3.64000000000000E+000,1140,"VTS"']

# Construct the schema from the header 
>>> header = taxiFile.first()
>>> header
u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'
>>> schemaString = header.replace('"','')  # get rid of the double-quotes
>>> schemaString
u'_id,_rev,dropoff_datetime,dropoff_latitude,dropoff_longitude,hack_license,medallion,passenger_count,pickup_datetime,pickup_latitude,pickup_longitude,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id'
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
>>> schema = StructType(fields)

# Subtract header and use the above-constructed schema:
>>> taxiHeader = taxiFile.filter(lambda l: "_id" in l) # taxiHeader needs to be an RDD - the string we constructed above will not do the job
>>> taxiHeader.collect() # for inspection purposes only
[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']
>>> taxiNoHeader = taxiFile.subtract(taxiHeader)
>>> taxi_df = taxiNoHeader.toDF(schema)  # Spark dataframe
>>> import pandas as pd
>>> taxi_DF = taxi_df.toPandas()  # pandas dataframe 

For brevity, here all columns end up being of type string, but in the blog post I show in detail and explain how you can further refine the desired data types (and names) for specific fields.

Sign up to request clarification or add additional context in comments.

Comments

27

The simple answer would be set header='true'

Eg:

df = spark.read.csv('housing.csv', header='true')

or

df = spark.read.option("header","true").format("csv").schema(myManualSchema).load("maestraDestacados.csv")

1 Comment

or header=True
3

One more way to do is below,

log_txt = sc.textFile(file_path)
header = log_txt.first() #get the first row to a variable
fields = [StructField(field_name, StringType(), True) for field_name in header] #get the types of header variable fields
schema = StructType(fields) 
filter_data = log_txt.filter(lambda row:row != header) #remove the first row from or else there will be duplicate rows 
df = spark.createDataFrame(filter_data, schema=schema) #convert to pyspark DF
df.show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.