0

I need help in pyspark. I am streaming json data from kafka i need convert the as Dataframe in pyspark. To stream I have used the below code.

from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
    nf = gspark.read.json(rdd)
    nf.toDF().show()
    # Convert RDD[String] to RDD[Row] to DataFrame
    #rowRdd = rdd.map(lambda w: Row(word=w))
    #wordsDataFrame = gspark.createDataFrame(rowRdd)
    #pdf = wordsDataFrame.toDF()
    #pdf.show()
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    gspark = SparkSession \
        .builder \
        .appName("SparkSteaming Kafka Receiver") \
        .config("spark.some.config.option", "some-value") \
        .config("spark.ui.port", 22300) \
        .config("spark.executor.instances", 4) \
        .config("spark.executor.cores", 4) \
        .getOrCreate()
    sc = gspark.sparkContext
    SQLContext= SQLContext(sc)
    ssc = StreamingContext(sc, 15)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
    lines = kvs.map(lambda (key,value): json.loads(value))
    lines.pprint()
    lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()

For the above code i unable to convert the json data into Dataframe. can anyone correct me where i need do the changes,does in Json2DF function or main function.

thanks bala

1 Answer 1

1

Fist of all, ensuring all json datas have the same schema.

def check_json(js, col):
    try:
        data = json.loads(js)
        return [data.get(i) for i in col]
    except:
        return []


def convert_json2df(rdd, col):
    ss = SparkSession(rdd.context)
    if rdd.isEmpty():
        return
    df = ss.createDataFrame(rdd, schema=StructType("based on 'col'"))
    df.show()


cols = ['idx', 'name']

sc = SparkContext()
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream('localhost', 9999) \
    .map(lambda x: check_json(x, cols)) \
    .filter(lambda x: x) \
    .foreachRDD(lambda x: convert_json2df(x, cols))

ssc.start()
ssc.awaitTermination()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.