2

I have data coming into Kafka in the below format.

{"WHS":[{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}]}

I want it to be converted like this.

{"Character Set":"UTF-8","action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":0,"Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-3":"","Addr-4":"","City":"","State":""}

I tried to flatten using ksql but ksql doesn't support arrays yet.

I tried to flatten using kstream using the below code.

builder.stream(inputTopic).flatMapValues(Object -> Arrays.asList()).to(outputTopic);

But it is not producing any output. Any help with this will be greatly appreciated.

1
  • 1
    "ksql doesn't support arrays yet" - that's not true. KSQL does support arrays. Commented Apr 27, 2020 at 8:35

2 Answers 2

5

KSQL / ksqlDB does support arrays. Here's how to do what you're asking with it:

-- Declare the stream
CREATE STREAM TEST1 
    (WHS ARRAY<STRUCT<"action"           VARCHAR
                    , "Update-Date-Time" VARCHAR
                    , "Number"           VARCHAR
                    , "Abbr"             VARCHAR
                    , "Name"             VARCHAR
                    , "Name2"            VARCHAR
                    , "Country-Code"     VARCHAR
                    , "Addr-1"           VARCHAR
                    , "Addr-2"           VARCHAR
                    , "Addr-4"           VARCHAR
                    , "City"             VARCHAR
                    , "State"            VARCHAR>>) 
    WITH (KAFKA_TOPIC ='test1'
         ,VALUE_FORMAT='JSON');

-- Set querying from beginning of the topic
SET 'auto.offset.reset' = 'earliest';

-- Query the array         
ksql> SELECT WHS FROM TEST1 EMIT CHANGES LIMIT 1;
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|WHS                                                                                                                                                   |
+------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}] |
Limit Reached
Query terminated
ksql>         

-- Flatten the array
ksql> SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES LIMIT 1;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                                                                                                                                                                                                               |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{ACTION=finished, Update-Date-Time=2020-04-11 09:00:02:25, NUMBER=0, ABBR=, NAME=, NAME2=, Country-Code=, Addr-1=, Addr-2=, Addr-4=, City=, STATE=}                                                                                                                                      |
Limit Reached
Query terminated
ksql>

You can write it to another stream (topic):

ksql> CREATE STREAM TEST1_EXPLODE WITH (KAFKA_TOPIC='NEW_TEST1') AS SELECT EXPLODE(WHS) FROM TEST1 EMIT CHANGES;

 Message
-------------------------------------------
 Created query with ID CSAS_TEST1_EXPLODE_155
-------------------------------------------
ksql> PRINT NEW_TEST1;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"KSQL_COL_0":{"ACTION":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","NUMBER":"0","ABBR":"","NAME":"","NAME2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","STATE":""}}

If you want to flatten the resulting structure you can do this too:

CREATE STREAM TEST1_FLATTENED AS SELECT  EXPLODE(WHS)->"action"           AS "action"           ,
        EXPLODE(WHS)->"Update-Date-Time" AS "Update-Date-Time" ,
        EXPLODE(WHS)->"Number"           AS "Number"           ,
        EXPLODE(WHS)->"Abbr"             AS "Abbr"             ,
        EXPLODE(WHS)->"Name"             AS "Name"             ,
        EXPLODE(WHS)->"Name2"            AS "Name2"            ,
        EXPLODE(WHS)->"Country-Code"     AS "Country-Code"     ,
        EXPLODE(WHS)->"Addr-1"           AS "Addr-1"           ,
        EXPLODE(WHS)->"Addr-2"           AS "Addr-2"           ,
        EXPLODE(WHS)->"Addr-4"           AS "Addr-4"           ,
        EXPLODE(WHS)->"City"             AS "City"             ,
        EXPLODE(WHS)->"State"            AS "State"
    FROM TEST1 EMIT CHANGES;
ksql> PRINT TEST1_FLATTENED;
…
Value format: JSON or KAFKA_STRING
rowtime: 4/27/20 8:28:46 AM UTC, key: <null>, value: {"action":"finished","Update-Date-Time":"2020-04-11 09:00:02:25","Number":"0","Abbr":"","Name":"","Name2":"","Country-Code":"","Addr-1":"","Addr-2":"","Addr-4":"","City":"","State":""}
Sign up to request clarification or add additional context in comments.

1 Comment

The above flatten query was working fine in 0.9.0 ksql, we have upgraded to 0.11.0 and it is giving below error io.confluent.ksql.execution.expression.tree.SearchedCaseExpression cannot be cast to io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp
2

Arrays.asList() only creates empty list.

Add the actual logic to get array from input and convert it to a collection (e.g. an ArrayList) which implement Iterable, here I try using flatMapValues with Jackson:

        builder.stream(inputTopic).flatMapValues((ValueMapper<JsonNode, Iterable<JsonNode>>) value -> {
            ArrayNode arrayNode = (ArrayNode) value.get("WHS");
            return arrayNode::iterator;
        });

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.