null ROWKEY in a stream causes NullPointerExceptio

2019-08-25 02:29发布

I have created a stream from a kafka topic which has the following structure:

ksql> describe trans_live2;

 Field       | Type                      
-----------------------------------------
 ROWTIME     | BIGINT           (system) 
 ROWKEY      | VARCHAR(STRING)  (system) 
 ID          | INTEGER                   
 DESCRIPTION | VARCHAR(STRING)           
 AMOUNT      | DOUBLE                    
 CURRENCYID  | INTEGER                   
-----------------------------------------

When a new row is added to a MySQL table, the source connector sends that row in Apache Kafka which in turn is being streamed in trans_live2.

For example, running in MySQL:

insert into transactions values(15, 'test15', 10.05, 1);

KSQL will contain:

select * from trans_live2;
1518606166292 | null | 15 | test15 | 10.05 | 1

but I have no idea why the ROWKEY is null.

I am also trying to join this stream with a table latest

ksql> describe latest;

 Field        | Type                      
------------------------------------------
 ROWTIME      | BIGINT           (system) 
 ROWKEY       | VARCHAR(STRING)  (system) 
 CURRENCYID   | INTEGER          (key)    
 MIDPRICE     | DOUBLE           (key)    
 MAXTIMESTAMP | BIGINT                    
------------------------------------------

using this statement

CREATE stream live_transactions_stream3 AS SELECT t1.id, t1.description, t1.amount, t1.currencyid, t2.midprice, t2.maxtimestamp FROM trans_live2 t1 LEFT JOIN LATEST t2 on t1.currencyid = t2.currencyid;

but I get the following error:

Exception in thread "ksql_query_CSAS_LIVE_TRANSACTIONS_STREAM3-0d676326-237b-4320-9dab-542b42a960d9-StreamThread-161" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_3, processor=KSTREAM-SOURCE-0000000012, topic=ksql_query_CSAS_LIVE_TRANSACTIONS_STREAM3-KSTREAM-MAP-0000000009-repartition, partition=3, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:238)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Caused by: org.apache.kafka.common.errors.SerializationException: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of double in field MIDPRICE of ksql.avro_schema

which I guess is caused by the null rowkey.

I wanted to ask if these exceptions are -indeed- related to the null ROWKEY of my stream and if so, how can I solve this issue.

1条回答
一纸荒年 Trace。
2楼-- · 2019-08-25 03:30

This is a known bug which is fixed in the following PR: https://github.com/confluentinc/ksql/pull/679 The problem was that the generated avro schema for the results did not allow null values for fields. With the above PR you will be able to have null values for fields.

查看更多
登录 后发表回答