4

I have an issue similar to many other posted questions regarding PySpark, but none of those solutions seem applicable to my problem, so I'm posting a new question.

I'm following this tutorial https://github.com/emiljdd/Tweepy-SparkTwitterI, but can't seem to get step 7 in Phase II to work.

Running this code:

count = 0
while count < 10:

 time.sleep( 3 )
 top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )
 top_10_df = top_10_tweets.toPandas() # Dataframe library
 display.clear_output(wait=True) #Clears the output, if a plot exists.
 sns.plt.figure( figsize = ( 10, 8 ) )
 sns.barplot( x="count", y="tag", data=top_10_df)
 sns.plt.show()
 count = count + 1

I get the following error:

Py4JJavaError: An error occurred while calling o24.sql.
: org.apache.spark.sql.AnalysisException: Table or view not found: tweets; line 1 pos 23
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:665)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:617)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:647)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:640)...

I can of course post more code if it would help, but I really am just following the tutorial without any changes.

The streaming-setup from Phase I seems fine, as I can see the Tweets being sent.

Any suggestions?

Thanks!

4 Answers 4

4

I guess the dataframe doesn't created such as tempview. You must create dataframe with "tweets" temp view name. Sample code is:

dataFrame.createOrReplaceTempView("tweets")

Please check your code contains this creation.

Example snippet:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Dataframe example').getOrCreate()

dataframe = spark.read.csv("/path/to/file.csv", inferSchema = True, header = True)
dataframe.show()
dataframe.createOrReplaceTempView("tempview")

spark.sql('select id,name from tempview').show()
spark.sql('select avg(age) from tempview').show()
Sign up to request clarification or add additional context in comments.

3 Comments

Can you please add a snippet ?
Thanks for replying. I tried that but it did not work since my output was a DStream RDD but the normal RDD. Do you have any idea how we can create a tempview of Dtream RDD ?
You can use spark structered streaming. Please check that: spark.apache.org/docs/latest/…
0

Answered question on the github issues page for this https://github.com/emiljdd/Tweepy-SparkTwitterI/issues/5

Problem is step 4 on that tutorial this line of code

( lines.flatMap( lambda text: text.split( " " ) ) 
             .filter( lambda word: word.lower().startswith("#") ) 
             .map( lambda word: ( word.lower(), 1 ) ) 
             .reduceByKey( lambda a, b: a + b ) 
             .map( lambda rec: Tweet( rec[0], rec[1] ) ) 
             ##.foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") # the desc here doesnt work with 
             .foreachRDD( lambda rdd: rdd.toDF().sort("count")      
             .limit(10).registerTempTable("tweets") ) )

remove the desc in the sort and it should work. It doesnt throw an exception but it should

Comments

0

The issue could be ->
1. your spark default is in yarn, change it local mode.

2.There is only one executor working, which is receiving message, but no executor is processing message.

Solution: while opening your jupyter follow the below command.(ensure your pyspark2 by default opens in jupyter)

[cloudera@quickstart ~]$ pyspark2 --master local[2]

This should solve your problem.

Comments

0

The issue could be with the SPARK version not being compatible with your SQLContext code.

I have made some changes to the code and got it working.

Step1: Import Spark Session from Sql functions

from pyspark.sql import SparkSession,SQLContext

Step2: Created a Spark Sql sessions:

spark = SparkSession.builder.appName('Sparksql').getOrCreate()

Step3: Instead of registerTempTable use createOrReplaceTempView : .limit(10).createOrReplaceTempView("tweets")

Step4: Check if the Sql works as below:

top_10_tweets = spark.sql( 'Select tag, count from tweets' )
top_10_tweets.show()

Please let me us know how you get on with it.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.