0

I am using below program and runnign this in Anaconda(Spyder) for creating data pipeline from Kafka to Spark streaming & in python

import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os


##Step 1: Initialize sparkcontext
spark_context = SparkContext(appName="Transformation Application")

###Step 2: Initialize streaming context
ssc = StreamingContext(spark_context, 5)

def utf8_decoder(s):
    """ Decode the unicode as UTF-8 """
    if s is None:
        return None
    return s.decode('utf-8')

message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()

When I am printing message, words,wordscount i am getting no proper results,I am getting hexadecimal values .

message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>

wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>

in my topic(testtopic) I am produced string - " Hi Hi Hi how are you doing" then wordcount should give count for each word but it is giving some encoded hexadecimal values

10
  • What did you expect the line with just message to do? You're printing the Python object, not consuming the stream... Also, Spark has its own json functions, you shouldn't need to (try to) import Spring serializers Commented Nov 20, 2020 at 15:37
  • @OneCricketeer Sorry I am new to spark and kafka both..it means message output is fine. but printing wordcount and words both also giving TransformedDStream object..While wordcount should give counts of all words like 3 for Hi and 1 for each rest word for Topic with string produced as "my topic(testtopic) I am produced string - " Hi Hi Hi how are you doing"... Also If I have to create data pipeline from kafka>Spark Streaming>MYSQL DB then how can I make sure that data is kafka topic is avilable in Spark where can I see that data in Spark. Please help to guide on this. Commented Nov 21, 2020 at 12:35
  • My point is that you're printing an object, which is a Python "problem", unrelated to spark or Kafka... wordcount.pprint() is correct if you want to actually see the data Commented Nov 21, 2020 at 16:31
  • @OneCricketeer Thanks Sir...Just one last question Can we convert message (Kafka direct stream) into spark data frame ? I have to store my streaming records in my sql.Since spark dataframe can be stored to mysql DB so thats why asking Commented Nov 22, 2020 at 20:57
  • You should be using Structured Streaming if you want to do that. Alternatively, Kafka Connect is provided by Kafka and you can use that to write to mysql as well Commented Nov 23, 2020 at 1:29

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.