Streaming messages from one Kafka Cluster to anoth

2019-02-18 07:53发布

问题:

I'm currently trying to, easily, stream messages from a Topic on one Kafka cluster to another one (Remote -> Local Cluster).
The idea is to use Kafka-Streams right away so that we don't need to replicate the actual messages on the local cluster but only get the "results" of the Kafka-Streams processing into our Kafka-Topics.

So let's say the WordCount demo is on one Kafka-Instance on another PC than my own. I also have a Kafka-Instance running on my local machine.
Now I want to let the WordCount demo run on the Topic ("remote") containing the sentences which words should be counted.
The counting however should be written to a Topic on my local system instead of a "remote" Topic.

Is something like this doable with the Kafka-Streams API?
E.g.

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()

Thank you very much
- Tim

回答1:

Kafka Streams is build for single cluster only.

A workaround is to use a foreach() or similar and instantiate your own KafkaProducer that write to the target cluster. Note, that your own producer must use sync writes! Otherwise, you might loose data in case of failure. Thus, it's not a very performant solution.

It's better to just write the result to the source cluster and replicate the data to the target cluster. Note, that you most likely can use a much shorter retention period of the output topic in the source cluster, as the actual data is stored with longer retention time in the target cluster anyway. This allows, you limit required storage on the source cluster.

Edit (reply to comment below from @quickinsights)

what if your Kafka streams service is down for longer period than the retention

That seems to be an orthogonal concern, that can be raised for any design. Retention time should be set depending on you maximum downtime to avoid data loss in general. Note thought, that because the application reads/write from/to the source cluster, and the source cluster output topic may be configures with a small retention time, nothing bad happens if the application goes down. The input topic will not be processed and no new output data is produced. You might only worry about the case, for which your replication pipeline into the target cluster goes down -- you should set the retention time of the output topic in the source cluster accordingly to make sure you don't loose any data.

It also doubles your writes back to Kafka.

Yes. It also increases storage footprint on disk. It's tradeoff (as always) between application resilience and runtime performance vs. cluster load. Your choice. I would personally recommend to go with the more resilient option as pointed out above. It's easier to scale out your Kafka cluster than handling all the resilience edge cases in your application code.

That seems super inefficient

That's a personal judgment call. It's a tradeoff and there is no objective right or wrong.