KSQL streams - Get data from Array of Struct

2020-07-23 06:57发布

问题:

My JSON looks like:

{
  "Obj1": {
    "a": "abc",
    "b": "def",
    "c": "ghi"
  },
  "ArrayObj": [
    {
      "key1": "1",
      "Key2": "2",
      "Key3": "3",

    },
    {
      "key1": "4",
      "Key2": "5",
      "Key3": "6",

    },
    {
      "key1": "7",
      "Key2": "8",
      "Key3": "9",

    }
  ]

}

I have written KSQL streams to convert it to AVRO and save to a topic, So that I can push it to JDBC Sink connector

CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e; 

In Example_Avro , I can get only first object in a array.

How can I get data shown as below, when I hit select * from Example_Avro in KSQL ?

  a    b   key1   key2  key3

  abc  def   1       2     3
  abc  def   4       5     6
  abc  def   7       8     9

回答1:

Test data (I removed the invalid trailing commas after key3 value):

ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }

Query:

SELECT OBJ1->A AS A, 
       OBJ1->B AS B, 
       EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
       EXPLODE(ARRAYOBJ)->KEY2 AS KEY2, 
       EXPLODE(ARRAYOBJ)->KEY3 AS KEY3 
FROM   TEST4 
EMIT CHANGES;

Result:

+-------+-------+------+-------+-------+
|A      |B      |KEY1  |KEY2   |KEY3   |
+-------+-------+------+-------+-------+
|abc    |def    |1     |2      |3      |
|abc    |def    |4     |5      |6      |
|abc    |def    |7     |8      |9      |

Tested on ksqlDB 0.6, in which the EXPLODE function was added.