I have looked at many examples to read JSON data from a Kafa topic. I was able to do this successfully if I read a single record from the topic per connection, for example:
{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
}
The code below works for the above use case:
package io.examle;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");
customers.printSchema();
customers.writeStream()
.format("console")
.start()
.awaitTermination();
}
}
But the above seems to me to be inefficient i.e making the connection to Kafa to get a single record per connection. So passing a JSON array of the form below, would be more efficient to my mind. As one can stock it with many "records" per json array.
[{
"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
"product": "Super widget",
"price": 10,
"bought_date": "2019-01-01"
},
{
"customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
"product": "Food widget",
"price": 4,
"bought_date": "2019-01-01"
}
]
The problem is that I am unable to unpack the JSON Array and process it. The code below fails:
package io.example;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class Stackoverflow {
public static void main(String[] args) throws StreamingQueryException {
StructType schema = new StructType(new StructField[]{
new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),
new StructField("product", DataTypes.StringType, false, Metadata.empty()),
new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
});
SparkSession spark = SparkSession
.builder()
.appName("SimpleExample")
.getOrCreate();
// Create a DataSet representing the stream of input lines from Kafka
Dataset<Row> dataset = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "utilization")
.load()
.selectExpr("CAST(value AS STRING) as json");
dataset.printSchema();
Column col = new Column("json");
Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));
Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
data.printSchema();
data.writeStream()
.format("console")
.start()
.awaitTermination();
}
}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;
Questions:
1) How to properly write code that will efficiently unpack the JSON array? I doubt that the approach I took above for the code that fails is the best, but I tried to follow the many examples I saw regarding functions.explode() etc.
2) If the code that fails is by some miracle a correct approach. How do I convert the struct to an array or a map?