2

I have looked at many examples to read JSON data from a Kafa topic. I was able to do this successfully if I read a single record from the topic per connection, for example:

{"customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
 "product": "Super widget",
 "price": 10,
 "bought_date": "2019-01-01"
}

The code below works for the above use case:

package io.examle;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

    public static void main(String[] args) throws StreamingQueryException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
            });

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        dataset.printSchema();

        Column col = new Column("json");

        Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data")).select("data.*");           
        customers.printSchema();

        customers.writeStream()      
        .format("console")
        .start()
        .awaitTermination();

    }

}

But the above seems to me to be inefficient i.e making the connection to Kafa to get a single record per connection. So passing a JSON array of the form below, would be more efficient to my mind. As one can stock it with many "records" per json array.

[{
        "customer_id": "8d267162-1478-11ea-8d71-362b9e155667",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "498443a2-1479-11ea-8d71-362b9e155667",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-01-01"
    } 
]

The problem is that I am unable to unpack the JSON Array and process it. The code below fails:

package io.example;

import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

    public static void main(String[] args) throws StreamingQueryException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
            });

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        dataset.printSchema();

        Column col = new Column("json");

        Dataset<Row> customers = dataset.select(functions.from_json(col,schema).as("data"));            


        Dataset<Row> data = customers.select(functions.explode_outer(functions.explode_outer(new Column("data"))));
        data.printSchema();

         data.writeStream()      
        .format("console")
        .start()
        .awaitTermination();
    }

}




Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`data`)' due to data type mismatch: input to function explode should be array or map type, not struct<customer_id:string,product:string,price:int,bought_date:string>;;

Questions:

1) How to properly write code that will efficiently unpack the JSON array? I doubt that the approach I took above for the code that fails is the best, but I tried to follow the many examples I saw regarding functions.explode() etc.

2) If the code that fails is by some miracle a correct approach. How do I convert the struct to an array or a map?

2 Answers 2

2

Spark doesn't pull one record per connection. The Kafka API will poll a batch of records at once.

As far as best practices in Kafka goes, multiple events should be split to multiple objects, not stuffed into an array unless they actually need correlated, for example you would have a "cart" record with a list of "items" for an order

For your code to work, your schema would have to be an ArrayType (not a struct or map).

StructType schema = new StructType(new StructField[]{
            new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
            new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
            new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
            new StructField("bought_date", DataTypes.StringType, false, Metadata.empty()),
        });

ArrayType arrSchema = new ArrayType(schema, false);

Then use the array schema when using from_json.

Sign up to request clarification or add additional context in comments.

Comments

1

For the sake of completeness the code below achieves the desired result using the advice above:

package io.example;


import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class Stackoverflow {

    public static void main(String[] args) throws StreamingQueryException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
            });

        ArrayType  arrayType = new ArrayType(schema, false);

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        dataset.printSchema();

        Column col = new Column("json");        
        Column data = functions.from_json(col,arrayType).as("data");    
        Column explode = functions.explode(data);
        Dataset<Row> customers = dataset.select(explode).select("col.*");
        customers.schema();

        customers.writeStream()      
        .format("console")
        .start()
        .awaitTermination();


    }

}


Batch: 77
-------------------------------------------
+-----------+-------------+-----+-----------+
|customer_id|      product|price|bought_date|
+-----------+-------------+-----+-----------+
|   d6315a00| Super widget|   10|2019-01-01 |
|   d6315cd0|  Food widget|    4| 2019-01-01|
|   d6315e2e|  Bike widget|   10| 2019-01-01|
|   d631614e|Garage widget|    4| 2019-01-01|
+-----------+-------------+-----+-----------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.