1

I have one DF with users list:

+-------+---+
|   user| Id|
+-------+---+
|  Peter|  1|
|    Max|  2|
|  Steve|  3|
|  Chris|  4|

and have another DF with information what page visited which user:

+-----------+-------+
|       page|   user|
+-----------+-------+
|      Page1|  Peter|
|      Page1|    Max|
|      Page2|    Max|
|      Page3|  Steve|
|      Page3|  Chris|
|      Page4|  Chris|

I did a "inner" join by user field:

+-----------+-------+---+
|       page|   user| Id|
+-----------+-------+---+
|      Page1|  Peter|  1| 
|      Page1|    Max|  2| 
|      Page2|    Max|  2| 
|      Page3|  Steve|  3| 
|      Page3|  Chris|  4| 
|      Page4|  Chris|  4| 

now want to create Sparse Vector of this DF which will look like this:

+-------+----------------------------------------------------+
|   page|                                            features|
+-------+----------------------------------------------------+
|  Page1|SparseVector({Peter: 1.0, Max: 1.0, Steve: 0.0,...})|
|  Page2|SparseVector({Peter: 0.0, Max: 1.0, Steve: 0.0,...})|
|  Page3|SparseVector({Peter: 0.0, Max: 0.0, Steve: 1.0,...})|
|  Page4|SparseVector({Peter: 0.0, Max: 0.0, Steve: 0.0,...})|

where will be set a "1.0" if there is a user for the page and "0.0" if user not present for the page

from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")

df = sc.parallelize([Row(PAGE="Page1", USER="Peter", USER_ID=1),
                     Row(PAGE="Page1", USER="Max", USER_ID=2),
                     Row(PAGE="Page2", USER="Max", USER_ID=2),
                     Row(PAGE="Page3", USER="Steve", USER_ID=3),
                     Row(PAGE="Page3", USER="Chris", USER_ID=4),
                     Row(PAGE="Page4", USER="Chris", USER_ID=4)]).toDF()


dfpivot=(df.groupBy("PAGE").pivot("USER").count().na.fill(0))
dfpivot.show()
input_cols = [x for x in dfpivot.columns if x != "PAGE"]


dfassembler1 = (VectorAssembler(inputCols=input_cols, outputCol="features").transform(dfpivot)
                .select("PAGE", "features"))
dfassembler1.show()

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(dfassembler1)

model.transform(dfassembler1).show(3, False)

but I'm getting some exception: cannot resolve column name ... and error while calling "transform"

may be I'm doing something wrong. I'd appreciate for any help

4
  • your code is working fine when i run it. Can you include all imports and other parts as well? Commented Oct 4, 2018 at 13:30
  • included imports Commented Oct 4, 2018 at 14:28
  • Can you include the full error trace? Commented Oct 4, 2018 at 14:31
  • ` pyspark.sql.utils.AnalysisException: 'The pivot column author has more than 10000 distinct values, this could indicate an error. If this was intended, set spark.sql.pivotMaxValues to at least the number of distinct values of the pivot column.;' ` Commented Oct 4, 2018 at 15:10

2 Answers 2

1

The answer is in the error message. Just set spark.sql.pivotMaxValues in configuration to the number of distinct values you are expecting.

set('spark.sql.pivotMaxValues', '999999')
Sign up to request clarification or add additional context in comments.

Comments

1

This is the code that worked for me

spark.conf.set("spark.sql.pivotMaxValues", 9000000)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.