15

I have an rdd (we can call it myrdd) where each record in the rdd is of the form:

[('column 1',value), ('column 2',value), ('column 3',value), ... , ('column 100',value)]

I would like to convert this into a DataFrame in pyspark - what is the easiest way to do this?

1
  • It's not exactly clear from your question where you're having trouble. Is it the fact that you have so many columns? Or just that records of your RDD are lists of tuples? Commented Apr 7, 2015 at 23:25

4 Answers 4

32

How about use the toDF method? You only need add the field names.

df = rdd.toDF(['column', 'value'])
Sign up to request clarification or add additional context in comments.

2 Comments

this answer works, and the solution I posted below (based on your answer) would convert an rdd as described above to a DataFrame
what if you dont know the column names or want to use columns of some other dataframe ? related question from me: stackoverflow.com/questions/70882076/…
15

The answer by @dapangmao got me to this solution:

my_df = my_rdd.map(lambda l: Row(**dict(l))).toDF()

Comments

4

Take a look at the DataFrame documentation to make this example work for you, but this should work. I'm assuming your RDD is called my_rdd

from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# You have a ton of columns and each one should be an argument to Row
# Use a dictionary comprehension to make this easier
def record_to_row(record):
    schema = {'column{i:d}'.format(i = col_idx):record[col_idx] for col_idx in range(1,100+1)}
    return Row(**schema)


row_rdd = my_rdd.map(lambda x: record_to_row(x))

# Now infer the schema and you have a DataFrame
schema_my_rdd = sqlContext.inferSchema(row_rdd)

# Now you have a DataFrame you can register as a table
schema_my_rdd.registerTempTable("my_table")

I haven't worked much with DataFrames in Spark but this should do the trick

3 Comments

you might need to add a line after the sqlContext is created to load the implicits library: "import sqlContext .implicits._". See spark.apache.org/docs/1.3.0/sql-programming-guide.html
Isn't that a scala-only thing? My answer is written in Python
I got: AttributeError: 'SQLContext' object has no attribute 'inferSchema'
1

In pyspark, let's say you have a dataframe named as userDF.

>>> type(userDF)
<class 'pyspark.sql.dataframe.DataFrame'>

Lets just convert it to RDD (

userRDD = userDF.rdd
>>> type(userRDD)
<class 'pyspark.rdd.RDD'>

and now you can do some manipulations and call for example map function :

newRDD = userRDD.map(lambda x:{"food":x['favorite_food'], "name":x['name']})

Finally, lets create a DataFrame from resilient distributed dataset (RDD).

newDF = sqlContext.createDataFrame(newRDD, ["food", "name"])

>>> type(ffDF)
<class 'pyspark.sql.dataframe.DataFrame'>

That's all.

I was hitting this warning message before when I tried to call :

newDF = sc.parallelize(newRDD, ["food","name"] : 

.../spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py:336: UserWarning: Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row inst  warnings.warn("Using RDD of dict to inferSchema is deprecated. "

So no need to do this anymore...

1 Comment

what to do if each row has plenty of columns, and potentially each row is different in definition?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.