0

I am parsing logs using Kafka to Spark-Streaming. I have a function that output a single log into a dictionary so that i can upload to mongoDB. However it kept promptng me an error like this:

8 ERROR ShutdownHookManager: Exception while deleting Spark temp
 dir: C:\Users\siyang\AppData\Local\Temp\spark-660e59cc-6331-4ed1-b932-ca64f9a1b
8bd
java.io.IOException: Failed to delete: C:\Users\siyang\AppData\Local\Temp\spark-
660e59cc-6331-4ed1-b932-ca64f9a1b8bd
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m
cV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$m
cV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(Shu
tdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala
:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$ano
nfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at......

Below is the code. Does anyone know what's wrong?

from pymongo import MongoClient
from pyspark import SparkContext
from pyspark.streaming import StreamingContext  
from pyspark.streaming.kafka import KafkaUtils


# MongoDB
# ----------------------------------------
client = MongoClient('localhost', 27017)
db = client['ws-database']
collectionNm = 'ws'
collection = db[collectionNm]


# Spark Streaming
# ----------------------------------------
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 1) # listen every 1 second

spark = SparkSession(sc)
kafkaStream = KafkaUtils.createStream(ssc=ssc, zkQuorum='192.168.0.xxx:2181', groupId='m1', topics={'name':1}) 


def parsing(log):
    dict = {}

    # split message by line break
    log = log[1].split('\n')
    for line in log:
        if line.find('=') > 0
            words = line.split("=")
            hash = words[0]
            value = words[1].strip().replace('"',"")
            hash = hash.strip()
            dict[hash] = value

    # upload to mongodb 
    collection.insert_one(dict)


parsed = kafkaStream.map(parsing)
parsed.pprint()


ssc.start()
ssc.awaitTermination()

Any one has any idea what is going wrong? Thanks!

1 Answer 1

2

Map is a transformation action on your DStream. You want to use foreachRDD which is the generic output operation for each RDD created by the stream. The code would look something like this:

def sendRecord(partition):
   client = MongoClient('localhost', 27017)
   db = client['ws-database']
   collectionNm = 'ws'
   collection = db[collectionNm]

   for line in partition:
       // parsing step
       collection.insert_one(dict)

   client.close()


kafkaStream.foreachRDD(lambda rdd: rdd.foreachPartition(sendRecord))
kafkaStream.pprint()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.