I need help in pyspark. I am streaming json data from kafka i need convert the as Dataframe in pyspark. To stream I have used the below code.
from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
nf = gspark.read.json(rdd)
nf.toDF().show()
# Convert RDD[String] to RDD[Row] to DataFrame
#rowRdd = rdd.map(lambda w: Row(word=w))
#wordsDataFrame = gspark.createDataFrame(rowRdd)
#pdf = wordsDataFrame.toDF()
#pdf.show()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
gspark = SparkSession \
.builder \
.appName("SparkSteaming Kafka Receiver") \
.config("spark.some.config.option", "some-value") \
.config("spark.ui.port", 22300) \
.config("spark.executor.instances", 4) \
.config("spark.executor.cores", 4) \
.getOrCreate()
sc = gspark.sparkContext
SQLContext= SQLContext(sc)
ssc = StreamingContext(sc, 15)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda (key,value): json.loads(value))
lines.pprint()
lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()
For the above code i unable to convert the json data into Dataframe. can anyone correct me where i need do the changes,does in Json2DF function or main function.
thanks bala