I am trying to build a POC with Kafka 0.8.1. I am using my own java class as a Kafka message which has a bunch of String data types. I cannot use the default serializer class or the String serializer class that comes with Kafka library. I guess I need to write my own serializer and feed it to the producer properties. If you are aware of writing an example custom serializer in Kafka (in java), please do share. Appreciate a lot, thanks much.
问题:
回答1:
The things required for writing a custom serializer are:
- Implement
Encoder
with an object specified for the generic- Supplying a
VerifiableProperties
constructor is required
- Supplying a
- Override
toBytes(...)
method making sure a byte array is returned - Inject the serializer class into
ProducerConfig
Declaring a custom serializer for a producer
As you noted in your question, Kafka supplies a means to declare a specific serializer for a producer. The serializer class is set in a ProducerConfig
instance and that instance is used to construct the desired Producer
class.
If you follow Kafka's Producer Example you will construct ProducerConfig
via a Properties
object. When building your properties file be sure to include:
props.put("serializer.class", "path.to.your.CustomSerializer");
With the path to the class you want Kafka to use to serialize messages before appending them to the log.
Creating a custom serializer that Kafka understands
Writing a custom serializer that Kafka can properly interpret requires implementing the Encoder[T]
scala class that Kafka provides. Implementing traits in java is weird, but the following method worked for serializing JSON in my project:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
Your question makes it sound like you are using one object (lets call it CustomMessage
) for all messages appended to your log. If that's the case, your serializer could look more like this:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
Which would leave your property config to look like this:
props.put("serializer.class", "path.to.your.CustomSerializer");
回答2:
You need to implement both encode and decoder
public class JsonEncoder implements Encoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
The decoder code
public class JsonDecoder implements Decoder<Object> {
private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
public JsonDecoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public Object fromBytes(byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.readValue(bytes, Map.class);
} catch (IOException e) {
LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
}
return null;
}
}
The pom entry
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.1.3</version>
</dependency>
Set the default encoder in the Kafka property
properties.put("serializer.class","kafka.serializer.DefaultEncoder");
The writer and reader code is as follows
byte[] bytes = encoder.toBytes(map);
KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);
JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());