We are getting a CAST EXCEPTION when trying to pass the offsets from Pyspark code to Kafka DStream.
py4j.protocol.Py4JJavaError: An error occurred while calling o2938.createDirectStreamWithMessageHandler.
: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
We have upgraded from Python 2.7 to 3.6 and for the upgrade we had to change the long() to int() under the Offset function, on doing the change we are getting the CAST exception!!!
kafka_stream = KafkaUtils.createDirectStream(ssc=spark_streaming_ctx,
topics="Test",
kafkaParams={
"BROKER": "BROKER_LIST"),
"auto.offset.reset": "smallest"},
fromOffsets=self.Offset(spark, "TOPIC"),
messageHandler="messageHandler")
def Offset(in_spark, in_topic_list):
offsets_map = {}
offsets_map[Topic] = int(offset[1]) # convert long to int for Python3X
return offsets_map
We also tried use the future library https://pypi.org/project/past/, yet we didn't have any luck.
Python : 3.6 Pyspark : 2.4 Kafka : 2.1.0
float(offset[1])?