Spark structured streaming is resolving this exact problem.
Add below 3 dependencies in your pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.4</version>
</dependency>
Below is a sample code to replicate the scenario
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
public class StructuredSparkStreamingExample {
private static final String SPARK_SQL_STREAMING_CHECKPOINT_LOCATION_CONFIG = "spark.sql.streaming.checkpointLocation";
private static final String SPARK_SQL_STREAMING_CHECKPOINT_LOCATION = "/tmp/checkpoints";
public static void main(String[] args) throws InterruptedException {
try {
new StructuredSparkStreamingExample().initSparkSession();
} catch (StreamingQueryException | IOException | InterruptedException e) {
e.printStackTrace();
}
}
public void initSparkSession() throws IOException, InterruptedException, StreamingQueryException {
SparkSession spark = SparkSession.builder().master("local[*]").appName("StructuredSparkStreamingExample")
.config(SPARK_SQL_STREAMING_CHECKPOINT_LOCATION_CONFIG, SPARK_SQL_STREAMING_CHECKPOINT_LOCATION)
.getOrCreate();
Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
root.setLevel(Level.ERROR);
Dataset<Row> datasetsUserDetails = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "userDetails")
.option("maxOffsetsPerTrigger", 2)
.option("maxTriggerDelay", "5s")
.option("startingOffsets", "latest").load();
Dataset<Row> datasetUserName = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "userName")
.option("maxOffsetsPerTrigger", 2)
.option("maxTriggerDelay", "5s")
.option("startingOffsets", "latest").load();
StreamingQuery sqUserDetails = datasetsUserDetails.selectExpr("CAST(value AS STRING)").filter(Objects::nonNull).selectExpr(new String[]{"CAST(value AS STRING)"})
.map((MapFunction<Row, String>) row -> {
System.out.println("sleeping");
Thread.sleep(10000);
System.out.println("waking up");
return "100";}, Encoders.STRING())
.writeStream()
.outputMode(OutputMode.Update())
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "200000000")
.option("topic", "temp")
// .trigger(Trigger.Continuous(1000)) //only works when deployed on spark cluster. checkout https://stackoverflow.com/a/69469773
.start();
//
datasetUserName.selectExpr("CAST(value AS STRING)").filter(Objects::nonNull).selectExpr(new String[]{"CAST(value AS STRING)"})
.writeStream()
.outputMode(OutputMode.Update())
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.max.request.size", "200000000")
.option("topic", "temp2")
// .trigger(Trigger.Continuous(1000)) //only works when deployed on spark cluster. checkout https://stackoverflow.com/a/69469773
.start();
sqUserDetails.awaitTermination();
}
}
Even if sqUserDetails is blocked for 10seconds (mimicing some remote call) that does not block datasetUserName in proceeding.