ksql, select on table not show anything

2019-06-14 09:14发布

I created a source topic subscriber have input message like this:

{
  "ip_router": "",
  "ip_lan": "",
  "isdn": "2046573688",
  "end_datetime": "",
  "shop_code": "1000405100",
  "reg_type_id": "5131615",
  "contract_id": "",
  "update_datetime": "20170801171355",
  "project": "",
  "telecom_service_id": "2",
  "local_speed": "",
  "password": "",
  "price_plan": "",
  "vip": "",
  "local_price_plan": "",
  "sub_id": "1083168000",
  "sta_datetime": "20090511152847",
  "update_number_1": "1",
  "act_status": "000",
  "network_class": "",
  "limit_usage": "",
  "num_reset_zone": "",
  "deposit": "",
  "create_user": "TUDV_POPBGG",
  "num_of_computer": "",
  "cust_id": "10922428129",
  "status": "2",
  "active_datetime": "20090511152102",
  "ip_view": "",
  "channel_type_id": "",
  "ip_wan": "",
  "imsi": "452049760887694",
  "infrastructure_type": "",
  "product_code": "HPN03",
  "expire_datetime": "",
  "speed": "",
  "private_ip": "",
  "update_user": "MIGRATE",
  "ip_static": "",
  "vlan": "",
  "sub_type": "",
  "create_datetime": "20090511152102",
  "is_info_completed": "1",
  "pay_type": "2",
  "up_link": "",
  "promotion_code": "",
  "technology": "",
  "offer_id": "400001035",
  "dev_staff_id": "",
  "account_id": "",
  "deploy_accept_date": "",
  "serial": "8984049767000887694",
  "group_id": "",
  "ip_gateway": "",
  "first_connect": "",
  "org_product_code": "MIGRATE",
  "start_money": "100000",
  "keep_alive": "",
  "account": ""
}

And then I created a stream and a table on it:

CREATE STREAM str_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON');

CREATE TABLE tbl_subscriber_json (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) WITH (KAFKA_TOPIC='subscriber', VALUE_FORMAT='JSON', KEY = 'sub_id' );

I tried to use ksql for test:

SELECT * FROM str_subscriber_json; 

(print result when I put new json to subscriber topic)

SELECT * FROM tbl_subscriber_json; 

(didn’t show any thing when I put new json to subscriber topic)

So please clarify for me what wrong for this case?

Thanks you so much.

1条回答
smile是对你的礼貌
2楼-- · 2019-06-14 09:50

Summary

Your messages need to be keyed. If you don't have a message key then the semantics of a TABLE don't make any sense (since you can't show the value for a key, if there is no key).

I've reproduced your example, using kafkacat to produce messages with and without a key.

Test 1 - no key

Produce test message

$ echo '{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber

Stream output

Note the null in the second column - this is the key (the first column is the timestamp of the message; the remainder of the columns are the declared fields in the message)

ksql> select * from str_subscriber_json;
1528368689380 | null | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

Table output

ksql> SELECT * FROM tbl_subscriber_json;

(No output)

Test 2 - with a key set

The key is set arbitrarily here to 1, using kafkacat's -K flag to specify : as the key/value separator.

$ echo '1:{ "ip_router": "", "ip_lan": "", "isdn": "2046573688", "end_datetime": "", "shop_code": "1000405100", "reg_type_id": "5131615", "contract_id": "", "update_datetime": "20170801171355", "project": "", "telecom_service_id": "2", "local_speed": "", "password": "", "price_plan": "", "vip": "", "local_price_plan": "", "sub_id": "1083168000", "sta_datetime": "20090511152847", "update_number_1": "1", "act_status": "000", "network_class": "", "limit_usage": "", "num_reset_zone": "", "deposit": "", "create_user": "TUDV_POPBGG", "num_of_computer": "", "cust_id": "10922428129", "status": "2", "active_datetime": "20090511152102", "ip_view": "", "channel_type_id": "", "ip_wan": "", "imsi": "452049760887694", "infrastructure_type": "", "product_code": "HPN03", "expire_datetime": "", "speed": "", "private_ip": "", "update_user": "MIGRATE", "ip_static": "", "vlan": "", "sub_type": "", "create_datetime": "20090511152102", "is_info_completed": "1", "pay_type": "2", "up_link": "", "promotion_code": "", "technology": "", "offer_id": "400001035", "dev_staff_id": "", "account_id": "", "deploy_accept_date": "", "serial": "8984049767000887694", "group_id": "", "ip_gateway": "", "first_connect": "", "org_product_code": "MIGRATE", "start_money": "100000", "keep_alive": "", "account": "" }' \
| kafkacat -b localhost:9092 -P -t subscriber -K:

Stream output

Note the 1 in the second column - this is the key (the first column is the timestamp of the message; the remainder of the columns are the declared fields in the message)

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

Table output

1528368781916 | 1 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null

Rekeying topics automagically with KSQL

You can use KSQL to repartition topics. For example, taking your source subscriber topic, here is how to repartition it using KSQL in order to set the key:

ksql> CREATE STREAM SUBSCRIBER_KEYED AS SELECT * FROM str_subscriber_json PARTITION BY sub_id;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

This populates a Kafka topic (SUBSCRIBER_KEYED) on which you can then define a table:

CREATE TABLE subscriber_table (sub_id VARCHAR,contract_id VARCHAR,cust_id VARCHAR,account_id VARCHAR,telecom_service_id VARCHAR,isdn VARCHAR,imsi VARCHAR,serial VARCHAR,status VARCHAR,product_code VARCHAR,offer_id VARCHAR,act_status VARCHAR,sta_datetime BIGINT,active_datetime BIGINT,sub_type VARCHAR,end_datetime BIGINT,expire_datetime BIGINT,shop_code VARCHAR,dev_staff_id VARCHAR,promotion_code VARCHAR,vip VARCHAR,account VARCHAR,create_datetime BIGINT,create_user VARCHAR,update_datetime BIGINT,update_user VARCHAR,deposit VARCHAR,limit_usage VARCHAR,password VARCHAR,org_product_code VARCHAR,num_reset_zone VARCHAR,start_money VARCHAR,is_info_completed VARCHAR,channel_type_id VARCHAR,first_connect VARCHAR,speed VARCHAR,keep_alive VARCHAR,price_plan VARCHAR,local_price_plan VARCHAR,project VARCHAR,local_speed VARCHAR,technology VARCHAR,network_class VARCHAR,infrastructure_type VARCHAR,deploy_accept_date BIGINT,group_id VARCHAR,ip_static VARCHAR,ip_view VARCHAR,ip_lan VARCHAR,ip_wan VARCHAR,ip_gateway VARCHAR,ip_router VARCHAR,num_of_computer VARCHAR,vlan VARCHAR,up_link VARCHAR,private_ip VARCHAR,pay_type VARCHAR,update_number_1 VARCHAR,reg_type_id VARCHAR,type VARCHAR,update_number VARCHAR) \
WITH (KAFKA_TOPIC='SUBSCRIBER_KEYED', VALUE_FORMAT='JSON', KEY = 'sub_id' );

Now when you send a message to subscriber, even if it's not keyed, the table will work:

ksql> select * from subscriber_table;
1528369407576 | 1083168000 | 1083168000 |  | 10922428129 |  | 2 | 2046573688 | 452049760887694 | 8984049767000887694 | 2 | HPN03 | 400001035 | 000 | 20090511152847 | 20090511152102 |  | 0 | 0 | 1000405100 |  |  |  |  | 20090511152102 | TUDV_POPBGG | 20170801171355 | MIGRATE |  |  |  | MIGRATE |  | 100000 | 1 |  |  |  |  |  |  |  |  |  |  |  | 0 |  |  |  |  |  |  |  |  |  |  |  | 2 | 1 | 5131615 | null | null
查看更多
登录 后发表回答