Send Custom Java Objects to Kafka Topic

2020-02-10 04:46发布

I have my custom Java Object and wish to leverage JVM's in built serialization to send it to a Kafka topic, but serialization fails with below error

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

Payload.java

public class Payload implements Serializable {

    private static final long serialVersionUID = 123L;

    private String name="vinod";

    private int anInt = 5;

    private Double aDouble = new Double("5.0");

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAnInt() {
        return anInt;
    }

    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }

    public Double getaDouble() {
        return aDouble;
    }

    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }

}

During my creation of producer, I have the following properties set

<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />

My send invoke is as below

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

What is correct way to send a custom java object through a producer to a kafka topic ?

2条回答
走好不送
2楼-- · 2020-02-10 05:14

We have 2 Options as listed below

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer

Code Reference below

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) {

    }

    public byte[] serialize(String s, Object o) {

       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }

    public void close() {

    }
}

And set the value serializer accordingly

<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray

查看更多
smile是对你的礼貌
3楼-- · 2020-02-10 05:15

Since you are using ByteArraySerializer,you need to instantiate a byte[] producer.

Producer<byte[],byte[]> producer = new KafkaProducer<>(props);

and then while producing pass the byte[] after serializing or some other method,for instance,

producer.send(new ProducerRecord<byte[],byte[]>("test", new Payload().toString().getBytes()));

If you are passing just a Payload Object to the producer then it will be better to have key serializer and value serializer as whatever you intend to pass and while reading you need to read from that data.

It is good practice to use Serializable and ByteArraySerializer/ByteArrayDeserializer.

查看更多
登录 后发表回答