2

I have the following PySpark Input Dataframe:

+-------+------------+
| index | valuelist  |
+-------+------------+
| 1.0   | [10,20,30] |
| 2.0   | [11,21,31] |
| 0.0   | [14,12,15] |
+-------+------------+

Where:

  • Index: type Double
  • Valuelist: type Vector. (it's NOT Array)

From the above Input Dataframe, I want to get the following Output Dataframe in PySpark

+-------+-------+
| index | value |
+-------+-------+
| 1.0   | 20    |
| 2.0   | 31    |
| 0.0   | 14    |
+-------+-------+

Logic:

for each row:
  value = valuelist[index] 
0

2 Answers 2

1

Spark version 1.5 and higher

You can use pyspark.sql.functions.expr to pass a column value as an input to a function:

df.select("index", f.expr("valuelist[CAST(index AS integer)]").alias("value")).show()
#+-----+-----+
#|index|value|
#+-----+-----+
#|  1.0|   20|
#|  2.0|   31|
#|  0.0|   14|
#+-----+-----+

Spark version 2.1 and higher

If you have spark version 2.1 or higher, here's an alternative using pyspark.sql.functions.posexplode:

import pyspark.sql.functions as f

df.select("index", f.posexplode("valuelist").alias("pos", "value"))\
    .where(f.col("index").cast("int") == f.col("pos"))\
    .select("index", "value")\
    .show()
#+-----+-----+
#|index|value|
#+-----+-----+
#|  1.0|   20|
#|  2.0|   31|
#|  0.0|   14|
#+-----+-----+
Sign up to request clarification or add additional context in comments.

Comments

0

You can create a new column and pass these two columns as an input.

from pyspark.sql import functions as F
columns = ['index', 'valuelist']
vals = [
     (0, [1,2]),
     (1, [1,2])
]

df = sqlContext.createDataFrame(vals, columns)
df = df.withColumn(
"value", udf(lambda index_and_list: index_and_list[0][index_and_list[1]], IntegerType())(
    F.struct(F.col("valuelist"), F.col("index")))
    )

Got the following output:

> +-----+---------+-----+
|index|valuelist|value|
+-----+---------+-----+
|    0|   [1, 2]|    1|
|    1|   [1, 2]|    2|
+-----+---------+-----+

3 Comments

This will cause an error in the OP's code because their index column is not an integer, however that's easy to fix by adding a cast. Regardless, I think your udf is overly complicated/hard to understand- if you have to use one here, I'd go with something like df = df.withColumn("value", udf(lambda valuelist, index: valuelist[index], IntegerType())(F.col("valuelist"), F.col("index").cast("int"))) instead
giving error name 'udf' is not defined. Any idea?
from pyspark.sql.functions import udf

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.