3

I'm trying to convert a rdd to a dataframe in spark. My rdd was made by a parallelization of a list of integers, and I'm getting stuck when converting to a dataframe. It returns "TypeError: StructType can not accept object 60651 in type <class 'int'>".

Here you can see better:

# Create a schema for the dataframe
schema = StructType([StructField('zipcd', IntegerType(), True)] )

# Convert list to RDD
rdd = sc.parallelize(zip_cd) #solution: close within []. Another problem for the solution, if I do that, the problem 'lenght does not match: 29275 against 1' arises
#rdd=rdd.map(lambda x:int(x))

# Create data frame
zip_cd1 = spark.createDataFrame(rdd,schema)
#print(zip_cd1.schema)
zip_cd1.show()

It returns me the following:

   Py4JJavaError                             Traceback (most recent call last)
<ipython-input-59-13ef33f842e4> in <module>
      9 zip_cd1 = spark.createDataFrame(rdd,schema)
     10 #print(zip_cd1.schema)
---> 11 zip_cd1.show()

~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    482         """
    483         if isinstance(truncate, bool) and truncate:
--> 484             print(self._jdf.showString(n, 20, vertical))
    485         else:
    486             print(self._jdf.showString(n, int(truncate), vertical))

~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
   1307 
   1308         answer = self.gateway_client.send_command(command)
-> 1309         return_value = get_return_value(
   1310             answer, self.gateway_client, self.target_id, self.name)
   1311 

~\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
    109     def deco(*a, **kw):
    110         try:
--> 111             return f(*a, **kw)
    112         except py4j.protocol.Py4JJavaError as e:
    113             converted = convert_exception(e.java_exception)

~\Anaconda3\envs\pyspark_env\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o900.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 1240) (MTYCURB-HOLAP.ACS-JRZ.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\session.py", line 682, in prepare
    verify_func(obj)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1409, in verify
    verify_value(obj)
  File "C:\Users\52269198\Anaconda3\envs\pyspark_env\lib\site-packages\pyspark\sql\types.py", line 1396, in verify_struct
    raise TypeError(new_msg("StructType can not accept object %r in type %s"
TypeError: StructType can not accept object 60651 in type <class 'int'>

zip_cd is just a list of integers, I don't know why it causes me a lot of trouble:

zip_cd 

[60651,
 60623,
 60077,
 60626,
 60077,
 0,
 60651,
 60644,

2 Answers 2

6

Your schema expects the input of a collection with (n,1) shape not (1,n).

zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]

schema = StructType([StructField('zipcd', IntegerType(), True)])
rdd = sc.parallelize(zip_cd)
rdd = rdd.map(lambda x:[x]) # transform the rdd
 
zip_cd1 = spark.createDataFrame(rdd,schema)
# zip_cd1 = spark.createDataFrame([[x] for x in zip_cd], schema) # list to dataframe directly
zip_cd1.show()

result

+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
|    0|
|60651|
|60644|
+-----+
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, it worked. So, the solution was to encapsulate each element of the list in a list? I tried before encapsulate all the list in a bigger sublist with "rdd = sc.parallelize([zip_cd])", but then it returned me an error of indexes not matching. With your code 'rdd = rdd.map(lambda x:[x]) is when you transform the collection in shape(n,1)', instead of what I was doing
@ArnoldoOliva Consider to mark AdibP answer as accepted. For your second question, create a new question
1

The createDataFrame function expects a list of lists, where the sublists each represent a row:

zip_cd = [60651, 60623, 60077, 60626, 60077, 0, 60651, 60644]
reformatted_ = map(lambda x: [x], zip_cd)
zip_cd1 = spark.createDataFrame(reformatted_, schema='zipcd int')

Yields the desired Spark Data Frame

+-----+
|zipcd|
+-----+
|60651|
|60623|
|60077|
|60626|
|60077|
|    0|
|60651|
|60644|
+-----+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.