Kafka 0.8V
I want to publish /consume byte[] objects, java bean objects, serializable objects and much more..
What is the best way to define a publisher and consumer for this type scenario?
When I consume a message from the consumer iterator, I do not know what type of the message it is.
Can anybody point me a guide on how to design such scenarios?
I enforce a single schema or object type per Kafka Topic. That way when you receive messages you know exactly what you are getting.
At a minimum, you should decide whether a given topic is going to hold binary
or string
data, and depending on that, how it will be further encoded.
For example, you could have a topic named Schema that contains JSON
-encoded objects stored as strings.
If you use JSON
and a loosely-typed language like JavaScript, it could be tempting to store different objects with different schemas in the same topic. With JavaScript, you can just call JSON.parse(...)
, take a peek at the resulting object, and figure out what you want to do with it.
But you can't do that in a strictly-typed language like Scala. The Scala JSON parsers generally want you to parse the JSON into an already defined Scala type, usually a case class
. They do not work with this model.
One solution is to keep the one schema / one topic rule, but cheat a little: wrap an object in an object. A typical example would be an Action object where you have a header that describes the action, and a payload object with a schema dependent on the action type listed in the header. Imagine this pseudo-schema:
{name: "Action", fields: [
{name: "actionType", type: "string"},
{name: "actionObject", type: "string"}
]}
This way, in even a strongly-typed language, you can do something like the following (again this is pseudo-code) :
action = JSONParser[Action].parse(msg)
switch(action.actionType) {
case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}
One of the neat things about this approach is that if you have a consumer that's waiting for only a specific action.actionType
, and is just going to ignore all the others, it's pretty lightweight for it to decode just the header and put off decoding action.actionObject
until when and if it is needed.
So far this has all been about string-encoded data. If you want to work with binary data, of course you can wrap it in JSON as well, or any of a number of string-based encodings like XML. But there are a number of binary-encoding systems out there, too, like Thrift and Avro. In fact, the pseudo-schema above is based on Avro. You can even do cool things in Avro like schema evolution, which amongst other things provides a very slick way to handle the above Action
use case -- instead of wrapping an object in an object, you can define a schema that is a subset of other schemas and decode just the fields you want, in this case just the action.actionType
field. Here is a really excellent description of schema evolution.
In a nutshell, what I recommend is:
- Settle on a schema-based encoding system (be it JSON, XML, Avro,
whatever)
- Enforce a one schema per topic rule