2

Spark dataframe can be written into mongodb collection. Refer - https://docs.mongodb.com/spark-connector/master/python/write-to-mongodb/

But when tried to write spark structure stream into mongodb collection, it is not working.

Can you please suggest any better option to achive this than using pymongo code in udf.

1
  • How do you know "it is not working."? Can you include the code? Commented Jun 2, 2020 at 18:53

2 Answers 2

2

it is resolved using foreachBatch sink. PFB working sample code.

def write_mongo_row(df, epoch_id):
    mongoURL = "mongodb://XX.XX.XX.XX:27017/test.collection"
    df.write.format("mongo").mode("append").option("uri",mongoURL).save()
    pass

query=csvDF.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()

got idea from How to use foreach or foreachBatch in PySpark to write to database?

Sign up to request clarification or add additional context in comments.

1 Comment

Is there another alternative, other than initializing the mongo connection each time the function is called?
2

Sharing an alternative solution where config part is taken care at very beginning rather than handling configs later in save method (to seperate configs from logic).

def save(message: DataFrame):
    message.write \
        .format("mongo") \
        .mode("append") \
        .option("database", "db_name") \
        .option("collection", "collection_name") \
        .save()
    pass

spark: SparkSession = SparkSession \
    .builder \
    .appName("MyApp") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .master("local") \
    .getOrCreate()

df: DataFrame = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

query: StreamingQuery = df\
    .writeStream \
    .foreachBatch(save) \
    .start()

query.awaitTermination()

1 Comment

Support for Apache Structured Streaming was added recently in version 10 of the MongoDB Spark Connector. See mongodb.com/blog/post/new-mongodb-spark-connector mongodb.com/blog/post/…? and mongodb.com/docs/spark-connector/current/structured-streaming

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.