20

I am trying to normalize a column in SPARK DataFrame using python.

My dataset:

--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1      A     12560    45
2      B     2312890  90
.      .       .       .
.      .       .       .
.      .       .       .
--------------------------

In this dataset, except the userID and Name, I have to normalize the Revenue and No.of Days.

The output should look like this


userID|Name|Revenue|No.of.Days|
--------------------------
1      A     0.5     0.5
2      B     0.9       1
.      .       1     0.4
.      .     0.6       .
.      .       .       .
--------------------------

The formula used to calculate or normalizing the values in each column is

val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column

How can I do this in easy steps using PySpark?

2
  • Please add simple example of input, and expected output Commented Oct 31, 2016 at 7:00
  • @JackDaniel Did you find the solution? I am also facing the similar problem. Commented Jun 5, 2017 at 23:02

1 Answer 1

35

Hope the following code suffices your requirement.

Code :

df = spark.createDataFrame([ (1, 'A',12560,45),
                             (1, 'B',42560,90),
                             (1, 'C',31285,120),
                             (1, 'D',10345,150)
                           ], ["userID", "Name","Revenue","No_of_Days"])

print("Before Scaling :")
df.show(5)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("After Scaling :")
df.show(5)

Output:

Output

Sign up to request clarification or add additional context in comments.

2 Comments

What about normalizing all the columns of DataFrame, maybe thousands of columns? It seems that iterating over columns to be scaled is still slow.
Awesome answer. BUT, for anyone who is using KMeans() after this scaling, for some odd reason, it would through an error if I didn't leave the data types as vector. Using StandardScaler() + VectorAssembler() + KMeans() needed vector types. EVEN THOUGH using VectorAssembler converts it to a vector; I continually got a prompting that I had na/null values in my feature vector if I did float -> vector instead of vector -> vector.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.