0

So I am doing one-shot encoding in a pipeline and doing the fit method on it.

I have a data frame that has categorical as well as numerical columns, so I have one hot encoded categorical columns using string indexers.

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler


categoricalColumns = ['IncomeDetails','B2C','Gender','Occupation','POA_Status']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
    
label_stringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')
stages += [label_stringIdx]


#new_col_array.remove("client_id")

numericCols = new_col_array
numericCols.append('age')


assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df1)
new_df1 = pipelineModel.transform(new_df1)
selectedCols = ['label', 'features'] + cols

I am getting this error :

Py4JJavaError: An error occurred while calling o2053.fit.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(client_id#*****, 200)
+- *(4) HashAggregate(keys=[client_id#*****], functions=[], output=[client_id#*****])
   +- Exchange hashpartitioning(client_id#*****, 200)
      +- *(3) HashAggregate(keys=[client_id#*****], functions=[], output=[client_id#*****])
         +- *(3) HashAggregate(keys=[client_id#*****, event_name#27993], functions=[], output=[client_id#27980])
            +- Exchange hashpartitioning(client_id#*****, event_name#27993, 200)
               +- *(2) HashAggregate(keys=[client_id#*****, event_name#27993], functions=[], output=[client_id#*****, event_name#27993])
                  +- *(2) Project [client_id#*****, event_name#27993]
                     +- *(2) BroadcastHashJoin [client_id#*****], [Party_Code#*****], LeftSemi, BuildRight, false
                        :- *(2) Project [client_id#*****, event_name#27993]
                        :  +- *(2) Filter isnotnull(client_id#*****)
                        :     +- *(2) FileScan orc dbo.dp_clickstream_[client_id#*****,event_name#27993,dt#28010] Batched: true, Format: ORC, Location: **PrunedInMemoryFileIndex**[s3n://processed/db-dbo-..., PartitionCount: 6, PartitionFilters: [isnotnull(dt#28010), (cast(dt#28010 as timestamp) >= 1610409600000000), (cast(dt#28010 as timest..., PushedFilters: [IsNotNull(client_id)], ReadSchema: struct<client_id:string,event_name:string>
                        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false)


at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)


Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:

My Spark version is 2.4.3

2
  • 1
    See this: stackoverflow.com/questions/46930591/… Commented Jan 22, 2021 at 9:50
  • @blackbishop, Thanks , Do you have good resources/references for Pyspark ML Commented Jan 24, 2021 at 5:45

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.