73

I'm trying to convert Pandas DF into Spark one. DF head:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543
10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611
10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691

Code:

dataset = pd.read_csv("data/AS/test_v2.csv")
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(dataset)

And I got an error:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
10
  • 2
    My first assumption is that the file contains both numbers and strings in one column and Spark confuses over it. However, it should be handled by Pandas when importing. Commented May 29, 2016 at 18:21
  • does your DF have column names? Commented May 29, 2016 at 18:23
  • Yes it has. Should I disable them? Commented May 29, 2016 at 18:31
  • no, but it would be helpful if you would put it to your DF head output. Try to skip the 11-nth column (with NA's) and rerun your code Commented May 29, 2016 at 18:34
  • 1
    Why don't you use spark-csv? Commented May 29, 2016 at 19:11

7 Answers 7

102

I made this script, It worked for my 10 pandas Data frames

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

You can see it also in this gist

With this you just have to call spark_df = pandas_to_spark(pandas_df)

Sign up to request clarification or add additional context in comments.

9 Comments

verified this all works, also verified output goes through from pyspark out to parquet and into scala. Thanks Gonzalo. Wouldn't begin to know how, but this seems like a brilliant contribution to the open source community. maybe like pd.to_sparkdf() or something.
Gonzalo, I just forked your gist to support ArrayType[StringType]. Thanks again. Readers, this is great solution to go from pandas to pyspark and scala spark.
Warning: If you try to convert datetime objects consisting of date and time (like pd.to_datetime('2020-01-01 13:45:12')), the time information gets lost with your approach. To account for this, change DateType() to TimestampType().
Thanks for your elegant solution. Your function pandas_to_spark() runs without error for my use-case. I can display() the result. However, I cannot save the resulting dataframe, or perform simple filter statements. Could you please help me figure out why? stackoverflow.com/questions/69974539/…
Requesting to change elif f == 'float64': return FloatType() to elif f == 'float64': return DoubleType(), then add elif f == 'float32': return FloatType() after that line.
|
49

You need to make sure your pandas dataframe columns are appropriate for the type spark is inferring. If your pandas dataframe lists something like:

pd.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5062 entries, 0 to 5061
Data columns (total 51 columns):
SomeCol                    5062 non-null object
Col2                       5062 non-null object

And you're getting that error try:

df[['SomeCol', 'Col2']] = df[['SomeCol', 'Col2']].astype(str)

Now, make sure .astype(str) is actually the type you want those columns to be. Basically, when the underlying Java code tries to infer the type from an object in python it uses some observations and makes a guess, if that guess doesn't apply to all the data in the column(s) it's trying to convert from pandas to spark it will fail.

1 Comment

I found this very helpful. Follow-up question: When I went through and followed these steps for my own dataframe, I did not see any change to the pd.info(). How exactly is the dataframe itself changing? How could I check to see the pandas DataFrame has changed after using the .astype(str)?
49

Type related errors can be avoided by imposing a schema as follows:

note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").

import pyspark
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

pdDF = pd.read_csv("test.csv")

contents of the pandas data frame:

       col1     col2    col3    col4    col5    col6    col7    col8   ... 
0      10000001 1       0       1       12:35   OK      10002   1      ...
1      10000001 2       0       1       12:36   OK      10002   1      ...
2      10000002 1       0       4       12:19   PA      10003   1      ...

Next, create the schema:

from pyspark.sql.types import *

mySchema = StructType([ StructField("col1", LongType(), True)\
                       ,StructField("col2", IntegerType(), True)\
                       ,StructField("col3", IntegerType(), True)\
                       ,StructField("col4", IntegerType(), True)\
                       ,StructField("col5", StringType(), True)\
                       ,StructField("col6", StringType(), True)\
                       ,StructField("col7", IntegerType(), True)\
                       ,StructField("col8", IntegerType(), True)\
                       ,StructField("col9", IntegerType(), True)\
                       ,StructField("col10", IntegerType(), True)\
                       ,StructField("col11", StringType(), True)\
                       ,StructField("col12", StringType(), True)\
                       ,StructField("col13", IntegerType(), True)\
                       ,StructField("col14", IntegerType(), True)\
                       ,StructField("col15", IntegerType(), True)\
                       ,StructField("col16", IntegerType(), True)\
                       ,StructField("col17", IntegerType(), True)\
                       ,StructField("col18", IntegerType(), True)\
                       ,StructField("col19", IntegerType(), True)\
                       ,StructField("col20", IntegerType(), True)\
                       ,StructField("col21", IntegerType(), True)\
                       ,StructField("col22", IntegerType(), True)\
                       ,StructField("col23", IntegerType(), True)\
                       ,StructField("col24", IntegerType(), True)\
                       ,StructField("col25", IntegerType(), True)])

Note: True (implies nullable allowed)

create the pyspark dataframe:

df = spark.createDataFrame(pdDF,schema=mySchema)

confirm the pandas data frame is now a pyspark data frame:

type(df)

output:

pyspark.sql.dataframe.DataFrame

Aside:

To address Kate's comment below - to impose a general (String) schema you can do the following:

df=spark.createDataFrame(pdDF.astype(str)) 

5 Comments

Is it possible to generalize the schema creation part to just have it create all columns as a certain type? For example, just tell it that all columns as StringType (instead of assigning each column individually)
df=spark.createDataFrame(pdPD.astype(str))
Hi Grant, in the step where you created 'mySchema', did you have to type all that? Is there a way to extract the schema from an example piece of the pandas dataframe? Thanks.
Yes - had to type it all out (copied and pasted and changed where necessary). I've found that trying to get the spark data frame to infer the schema from the pandas data frame (as in the original question above) is too risky. My take is that forcing/imposing the correct schema is the lowest risk strategy. If you cant impose the required schema initially, then the quick and dirty approach would be to impose a string schema on everything (as shown above) and correct the types at a later stage.
Rather than type out the whole schema manually, you can run spark.createDataFrame(pandas_data_frame).schema to print the inferred schema and adjust as necessary.
18

In spark version >= 3 you can convert pandas dataframes to pyspark dataframe in one line

use spark.createDataFrame(pandasDF)

dataset = pd.read_csv("data/AS/test_v2.csv")

sparkDf = spark.createDataFrame(dataset);

if you are confused about spark session variable, spark session is as follows

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

2 Comments

Thanks for this! I spent a lot of time building a converter between pandas to spark, even started a github repo for it. This sure makes it easy, at least for simple data types.
This can throw error when it cannot infer the column type. So it is actually safe to use the Gonzalo Garcia's answer
11

I have tried this with your data and it is working :

%pyspark
import pandas as pd
from pyspark.sql import SQLContext
print sc
df = pd.read_csv("test.csv")
print type(df)
print df
sqlCtx = SQLContext(sc)
sqlCtx.createDataFrame(df).show()

2 Comments

For my data it takes like forever
See "How to Answer" and Explaining entirely code-based answers". While this might be technically correct, it doesn't explain why it solves the problem or should be the selected answer. We should educate along with helping solve the problem.
2

I cleaned up / simplified the top answer a bit:

import pyspark.sql.types as ps_types


def get_equivalent_spark_type(pandas_type):
    """
        This method will retrieve the corresponding spark type given a pandas
        type.

        Args:
            pandas_type (str): pandas data type

        Returns:
            spark data type
    """
    type_map = {
        'datetime64[ns]': ps_types.TimestampType(),
        'int64': ps_types.LongType(),
        'int32': ps_types.IntegerType(),
        'float64': ps_types.DoubleType(),
        'float32': ps_types.FloatType()}
    if pandas_type not in type_map:
        return ps_types.StringType()
    else:
        return type_map[pandas_type]


def pandas_to_spark(spark, pandas_df):
    """
        This method will return a spark dataframe given a pandas dataframe.

        Args:
            spark (pyspark.sql.session.SparkSession): pyspark session
            pandas_df (pandas.core.frame.DataFrame): pandas DataFrame

        Returns:
            equivalent spark DataFrame
    """
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    p_schema = ps_types.StructType([
        ps_types.StructField(column, get_equivalent_spark_type(pandas_type))
        for column, pandas_type in zip(columns, types)])

    return spark.createDataFrame(pandas_df, p_schema)

Comments

0
#Function to convert pandas dataframe to spark dataframe
def equivalent_type(f):
  """It will define datatypes to spark dataframe by considering pandas dataframe datatypes"""
  if f == 'datetime64[ns]': return TimestampType()
  elif f == 'int64': return LongType()
  elif f == 'int32': return IntegerType()
  elif f == 'float64': return FloatType()
  elif f == 'object': return StringType()
  else: return StringType()

def define_structure(string, format_type):
  """It will define columns and datatypes to spark dataframe"""
  try: typo = equivalent_type(format_type)
  except: typo = StringType()
  return StructField(string, typo, True)

def pandas_to_spark(pandas_df):
  """Given pandas dataframe, it will return a spark dataframe"""
  columns = list(pandas_df.columns)
  types = list(pandas_df.dtypes)
  null_count = list(pandas_df.isna().sum())
  struct_list = []
  for column, typo, null_c in zip(columns, types, null_count):
    if null_c > 0:
       pandas_df[column] = pandas_df[column].astype('string')
    struct_list.append(define_structure(column, typo))
  p_schema = StructType(struct_list)
  return sqlContext.createDataFrame(pandas_df, p_schema)

1 Comment

this will take care of columns with null values

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.