I am using below program and runnign this in Anaconda(Spyder) for creating data pipeline from Kafka to Spark streaming & in python
import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from uuid import uuid1
import os
##Step 1: Initialize sparkcontext
spark_context = SparkContext(appName="Transformation Application")
###Step 2: Initialize streaming context
ssc = StreamingContext(spark_context, 5)
def utf8_decoder(s):
""" Decode the unicode as UTF-8 """
if s is None:
return None
return s.decode('utf-8')
message = KafkaUtils.createDirectStream(ssc,topics=['testtopic'],kafkaParams={"metadata.broker.list":"localhost:9092","key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer","value.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"},fromOffsets=None,messageHandler=None,keyDecoder=utf8_decoder,valueDecoder=utf8_decoder)
message
words = message.map(lambda x: x[1]).flatMap(lambda x: x.split(" "))
wordcount=words.map(lambda x: (x,1)).reduceByKey(lambda a,b:a+b)
wordcount.pprint()
When I am printing message, words,wordscount i am getting no proper results,I am getting hexadecimal values .
message
Out[16]: <pyspark.streaming.kafka.KafkaDStream at 0x23f8b1f8248>
wordcount
Out[18]: <pyspark.streaming.dstream.TransformedDStream at 0x23f8b2324c8>
in my topic(testtopic) I am produced string - " Hi Hi Hi how are you doing" then wordcount should give count for each word but it is giving some encoded hexadecimal values
messageto do? You're printing the Python object, not consuming the stream... Also, Spark has its own json functions, you shouldn't need to (try to) import Spring serializerswordcount.pprint()is correct if you want to actually see the data