4

I process messages from Kafka with the following JSON structure:

{"unix_time": 1557678233, "category_id": 1000, "ip": "172.10.34.17", "type": "view"}

I want to print out what I'm receiving. Here is a code snippet of what I've done already:

JavaSparkContext sc = createJavaSparkContext();
JavaStreamingContext streamingContext =
                new JavaStreamingContext(sc, Durations.seconds(BATCH_DURATION_IN_SECONDS));

SparkSession sparkSession = SparkSession
        .builder()
        .config(new SparkConf())
        .getOrCreate();

Dataset<Row> df = sparkSession
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", CommonUtils.KAFKA_HOST_PORT)
        .option("subscribe", KAFKA_TOPIC)
        .load();

StreamingQuery query = df.selectExpr("CAST(value AS STRING)")
            .select(from_json(new Column("value"), getSchema())).as("data").
                    select("data.category_id").writeStream().foreach(new ForeachWriter<Row>() {
                @Override
                public void process(Row value) {
                    System.out.println(value);
                }

                @Override
                public void close(Throwable errorOrNull) {

                }

                @Override
                public boolean open(long partitionId, long version) {
                    return true;
                }
            })
            .start();

    query.awaitTermination();

Schema method:

private static StructType getSchema() {
    return new StructType(new StructField[]{
            new StructField(UNIX_TIME, DataTypes.TimestampType, false, Metadata.empty()),
            new StructField(CATEGORY_ID, DataTypes.IntegerType, false, Metadata.empty()),
            new StructField(IP, DataTypes.StringType, false, Metadata.empty()),
            new StructField(TYPE, DataTypes.StringType, false, Metadata.empty()),
    });
}

The problem is that I'm constantly receiving error while writing from Spark:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'data.category_id' given input columns: [jsontostruct(value)];; 'Project ['data.category_id] +- SubqueryAlias data +- Project [jsontostruct(StructField(unix_time,TimestampType,false), StructField(category_id,IntegerType,false), StructField(ip,StringType,false), StructField(type,StringType,false), value#15) AS jsontostruct(value)#18]

How to overcome this issue? Any suggestions on that?

0

1 Answer 1

3

This part of the exception tells you exactly where to look for answers:

cannot resolve 'data.category_id' given input columns: [jsontostruct(value)]

In other words, there is no column data.category_id among the available columns which is just one jsontostruct(value).

That means that the only select in the streaming query does not work. And the reason is fairly simple (I could qualify it as a typo) -- there are too many closing brackets before as("data") that is available on Column as well as Dataset types.

In summary, replace the following part of the query:

.select(from_json(new Column("value"), getSchema())).as("data")

to the following:

.select(from_json(new Column("value"), getSchema()).as("data"))

Note that I shifted one closing bracket to the end.

Sign up to request clarification or add additional context in comments.

2 Comments

thanks for the answer! It worked out, but now I can see only [null] in my console instead of the row itself... Any ideas why?
It is possible that the schema does not match and hence the parsing (deserializing from JSON to a row) fails. Failing is nulling. Make sure that the values are in a proper JSON format (per schema).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.