3

I am trying to execute Random Forest Classifier and evaluate the model using Cross Validation. I work with pySpark. The input CSV file is loaded as Spark DataFrame format. But I face a issue while constructing the model.

Below is the code.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
sc = SparkContext()
sqlContext = SQLContext(sc)
trainingData =(sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load("/PATH/CSVFile"))
numFolds = 10 
rf = RandomForestClassifier(numTrees=100, maxDepth=5, maxBins=5, labelCol="V5409",featuresCol="features",seed=42)
evaluator = MulticlassClassificationEvaluator().setLabelCol("V5409").setPredictionCol("prediction").setMetricName("accuracy")
paramGrid = ParamGridBuilder().build()

pipeline = Pipeline(stages=[rf])
paramGrid=ParamGridBuilder().build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)
model = crossval.fit(trainingData)
print accuracy

I am getting below error

Traceback (most recent call last):
  File "SparkDF.py", line 41, in <module>
    model = crossval.fit(trainingData)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/tuning.py", line 236, in _fit
    model = est.fit(train, epm[j])
  File "/usr/local/spark-2.1.1/python/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/pipeline.py", line 108, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/base.py", line 64, in fit
    return self._fit(dataset)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/wrapper.py", line 236, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark-2.1.1/python/pyspark/ml/wrapper.py", line 233, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/home/hadoopuser/anaconda2/lib/python2.7/site-packages/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/spark-2.1.1/python/pyspark/sql/utils.py", line 79, in deco
    raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'Field "features" does not exist.'
hadoopuser@rackserver-PowerEdge-R220:~/workspace/RandomForest_CV$ 

Please help me out to solve this issue in pySpark. Thank You.

I am showing the details of dataset here. No I don't have features column specifically. Below is the output of trainingData.take(5) which displays first 5 rows of dataset.

[Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)]

where V433 to V524 are features. V5409 is the class label.

0

2 Answers 2

6

Spark dataframes are not used like that in Spark ML; all your features need to be vectors in a single column, usually named features. Here is how you can do it using the 5 rows you have provided as an example:

spark.version
# u'2.2.0'

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# your sample data:
temp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])

trainingData=temp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
trainingData.show()
# +--------------------+-----+ 
# |            features|label|
# +--------------------+-----+
# |[-0.104,0.005,-0....|    0| 
# |[-0.137,0.001,-0....|    0|
# |[-0.155,-0.006,-0...|    0|
# |[-0.108,0.005,-0....|    0|
# |[-0.139,0.003,-0....|    0|
# +--------------------+-----+

after which, your pipeline should run fine (I am assuming that indeed you have multi-class classification, since your sample contains only 0's as labels) with only changing the label column in your rf and evaluator as follows:

rf = RandomForestClassifier(numTrees=100, maxDepth=5, maxBins=5, labelCol="label",featuresCol="features",seed=42)
evaluator = MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("accuracy")

Finally, print accuracy will not work - you'll need model.avgMetrics instead.

Sign up to request clarification or add additional context in comments.

5 Comments

When I give spark.createDataFrame() as mentioned above, it is showing NameError: name 'spark' is not defined. How to solve this? Thank You for this answer. It was quite useful.
@LokeswariVenkataramana Probably you are using an older version of Spark (1.x). You don't need that command - simply read your initial csv file as you do in your code, but in a dataframe named temp_df, and then proceed to define trainingData as I show.
Adding the below lines solved NameError : name Spark is not defined. from pyspark.context import SparkContext from pyspark.sql.session import SparkSession sc = SparkContext('local') spark = SparkSession(sc)
I am using Spark 2.2.0 version. I am getting error in the line model.crossval.fit(data). error is as follows. raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Column label must be of type NumericType but was actually of type StringType.' How to change string type to numeric in Spark DataFrame?
3

I would like to add my 5 cents to desertnaut's answer - as for now (Spark 2.2.0) there is quite handy VectorAssembler class which handles the transformation of multiple columns into one vector column. Then the code looks like this:

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# your sample data:
temp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])

assembler = VectorAssembler(
    inputCols=['V4366', 'V4460', 'V4916', 'V1495', 'V1639', 'V1967', 'V3049', 'V3746', 'V3869', 'V524'],
    outputCol='features')

trainingData = assembler.transform(temp_df)
trainingData.show()
# +------+------+------+------+------+------+-----+------+------+-----+-----+--------------------+
# | V1495| V1639| V1967| V3049| V3746| V3869|V4366| V4460| V4916| V524|V5409|            features|
# +------+------+------+------+------+------+-----+------+------+-----+-----+--------------------+
# |-0.104| 0.005|-0.008| 0.177|-0.675|-3.451|  0.0| 0.232|-0.017|0.004|    0|[0.0,0.232,-0.017...|
# |-0.137| 0.001| -0.01|  0.01|-0.867|-2.759|  0.0| 0.111|-0.003|  0.0|    0|[0.0,0.111,-0.003...|
# |-0.155|-0.006|-0.019|-0.706| 0.166| 0.189|  0.0|-0.391|-0.003|0.001|    0|[0.0,-0.391,-0.00...|
# |-0.108| 0.005|-0.002| 0.033|-0.787|-0.926|  0.0| 0.098|-0.012|0.002|    0|[0.0,0.098,-0.012...|
# |-0.139| 0.003|-0.006|-0.045|-0.208|-0.782|  0.0| 0.026|-0.004|0.001|    0|[0.0,0.026,-0.004...|
# +------+------+------+------+------+------+-----+------+------+-----+-----+--------------------+

This way it can be easily integrate as a processing step in the pipeline. Also important difference here is that new features column is appended to the data frame.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.