I have a Kafka Avro Topic generated using KafkaAvroSerializer.
My standalone properties are as below.
I am using Confluent 4.0.0 to run Kafka connect.
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<schema_registry_hostname>:8081
value.converter.schema.registry.url=<schema_registry_hostname>:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
When I run Kafka connectors for hdfs sink in standalone mode, I get this error message:
[2018-06-27 17:47:41,746] ERROR WorkerSinkTask{id=camus-email-service-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null
at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1640)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1527)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1410)
at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1290)
at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1014)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-06-27 17:47:41,748] ERROR WorkerSinkTask{id=camus-email-service-0} Task is being killed and will not recover until manually restarted ( org.apache.kafka.connect.runtime.WorkerTask)
[2018-06-27 17:52:19,554] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect).
When I use kafka-avro-console-consumer passing the schema registry, I get the Kafka messages deserialized.
i.e.:
/usr/bin/kafka-avro-console-consumer --bootstrap-server <kafka-host>:9092 --topic <KafkaTopicName> --property schema.registry.url=<schema_registry_hostname>:8081
I think your Kafka key is null, which is not Avro.
Or it is some other type but malformed, and not converted to a
RECORD
datatype. See AvroData source codeUPDATE According to your comment, then you can see this is true
In any case, HDFS Connect does not natively store the key, so try not deserializing the key at all rather than using Avro.
Also, your console consumer is not printing the key, so your test isn't adequate. You need to add
--property print.key=true
Changing the "subscription" column's datatype to Union datatype fixed the issue. Avroconverters were able to deserialize the messages.