I have created a stream from a kafka topic which has the following structure:
ksql> describe trans_live2;
Field | Type
-----------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ID | INTEGER
DESCRIPTION | VARCHAR(STRING)
AMOUNT | DOUBLE
CURRENCYID | INTEGER
-----------------------------------------
When a new row is added to a MySQL table, the source connector sends that row in Apache Kafka which in turn is being streamed in trans_live2
.
For example, running in MySQL:
insert into transactions values(15, 'test15', 10.05, 1);
KSQL will contain:
select * from trans_live2;
1518606166292 | null | 15 | test15 | 10.05 | 1
but I have no idea why the ROWKEY is null.
I am also trying to join this stream with a table latest
ksql> describe latest;
Field | Type
------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
CURRENCYID | INTEGER (key)
MIDPRICE | DOUBLE (key)
MAXTIMESTAMP | BIGINT
------------------------------------------
using this statement
CREATE stream live_transactions_stream3 AS SELECT t1.id, t1.description, t1.amount, t1.currencyid, t2.midprice, t2.maxtimestamp FROM trans_live2 t1 LEFT JOIN LATEST t2 on t1.currencyid = t2.currencyid;
but I get the following error:
Exception in thread "ksql_query_CSAS_LIVE_TRANSACTIONS_STREAM3-0d676326-237b-4320-9dab-542b42a960d9-StreamThread-161" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_3, processor=KSTREAM-SOURCE-0000000012, topic=ksql_query_CSAS_LIVE_TRANSACTIONS_STREAM3-KSTREAM-MAP-0000000009-repartition, partition=3, offset=0
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:238)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Caused by: org.apache.kafka.common.errors.SerializationException: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of double in field MIDPRICE of ksql.avro_schema
which I guess is caused by the null rowkey.
I wanted to ask if these exceptions are -indeed- related to the null ROWKEY of my stream and if so, how can I solve this issue.