4

I using the following code to write a stream to elasticsearch from python (pyspark) application.

#Streaming code
query = df.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "logs/raw") \
.option("es.nodes", "localhost") \
.start()

query.awaitTermination()

If I write the results to the console it works fine, also, if I write to ES - not in streaming mode, it works ok. This is the code I used to write to ES:

#Not streaming
df.write.format("org.elasticsearch.spark.sql") \
.mode('append') \
.option("es.resource", "log/raw") \
.option("es.nodes", "localhost").save("log/raw")

The thing is, I can't debug it, the code is running, but nothing is written to ES (in streaming mode).

Thanks,

1
  • What's the question then? What does not work? Commented Mar 5, 2018 at 18:19

2 Answers 2

3

Eventually did work out for me, the problem was technical (needed vpn)

query = df.writeStream \
.outputMode("append") \
.queryName("writing_to_es") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "index/type") \
.option("es.nodes", "localhost") \
.start()

query.awaitTermination()
Sign up to request clarification or add additional context in comments.

Comments

2

Code:

val stream = df
      .writeStream
      .option("checkpointLocation", checkPointDir)
      .format("es")
      .start("realtime/data")

SBT Dependency:

libraryDependencies += "org.elasticsearch" %% "elasticsearch-spark-20" % "6.2.4"

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.