I have a concrete class which I am Serializing in Byte array to be sent to a Kafka topic. For serializing I am using ReflectDatumWriter . Before sending the bytes[] I am putting schema ID in first 4 bytes with schema ID after checking some online tutorial.
I am able to send the message but while consuming it in Avro console consumer I am getting response as :
./bin/kafka-avro-console-consumer --bootstrap-server 0:9092 --property schema.stry.url=http://0:8081 --property print.key=true --topic Test
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
"1" "\u0000"
MParams ddb = new MParams();
ddb.setKey("ss");
for (int i = 0; i < 10; i++) {
ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
Future resp = kafkaFullAckProducer.send(record);
System.out.println("Success" + resp.get());
}
}
public static <T> byte[] serialize(T data) {
Schema schema = null;
if (data == null) {
throw new RuntimeException("Data cannot be null in AvroByteSerializer");
}
try {
schema = ReflectData.get().getSchema(data.getClass());
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
byte[] bytes = out.toByteArray();
return bytes;
} catch (java.io.IOException e) {
throw new RuntimeException("Error serializing Avro message", e);
}
}
public static byte[] build(Integer schemaId, byte[] data) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0);
try {
out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
out.write(data);
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException e) {
throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
}
@Data
public class MParams extends MetricParams{
// POJO version
@Nullable
private String key;
}
@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {
}
Working Serializer snippet
public byte[] serialize(String topic, T record) {
Schema schema;
int id;
try {
schema = ReflectData.get().getSchema(record.getClass());
id = client.register(topic + "-value", schema);
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
return serializeImpl(id, schema, record);
}
protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
if (object == null) {
return null;
}
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
out.write(0x0);
out.write(ByteBuffer.allocate(4).putInt(id).array());
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing Avro message", e);
}
}
Deserializer :
protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
// Even if the caller requests schema & version, if the payload is null
// cannot include it. The caller must handle
// this case.
if (payload == null) {
return null;
}
int id = -1;
try {
ByteBuffer buffer = getByteBuffer(payload);
id = buffer.getInt();
int length = buffer.limit() - 1 - 4;
int start = buffer.position() + buffer.arrayOffset();
DatumReader<T> reader = new ReflectDatumReader<T>(schema);
T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
return res;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing Avro message for id " + id, e);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0x0) {
throw new SerializationException("Unknown magic byte!");
}
return buffer;
}
Not clear why you are trying to bypass the
KafkaAvroSerializer
class's default behavior. (In your case, removeSchema.Parser
from that example, and use your Reflect record type rather thanGenericRecord
)You can put your concrete class as the second type of the producer and as long as it implements the base Avro classes, it should be serialized correctly (meaning ID computed correctly, not some number you create, and converted to bytes), registered to the registry, then sent to Kafka
Most importantly, the schema ID is not necessarily a 1 in the registry, and by putting that, the console consumer might be trying to deserialize your messages incorrectly, resulting in the wrong output
In other words, try