KafkaStreams在同一应用程序的多个流(KafkaStreams multiple stre

2019-10-29 08:08发布

我试图使基于惯例和合理性与KafkaStreams实用的设计决策。

比方说,我有,我想将它放在两个不同的事件KTable秒。 我有一个制片人将这些邮件发送到KStream在监听到该主题。

从我可以告诉我不能使用条件转发使用的消息KafkaStreams ,所以如果流是加入了许多主题(每个以上的邮件,例如),我只能叫stream.to在单水槽主题-否则,我会做一些像电话foreach在流并用邮件发送KProducer到水槽主题。

上述建议使用一个单一的数据流。 我想我可以建立多个数据流在同一个应用程序,每听一个话题,映射和转发到表片,但每次我试图创建的两个实例KafkaStreams ,只有第一个初始化订阅了它的主题-其他变从它的主题有没有订阅的客户发出警告。

我可以建立多个数据流在同一个应用程序? 如果是这样,是否有什么特殊要求?

class Stream(topic: String) {
  val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
  val streamsBuilder = new StreamsBuilder
  val topics = new util.ArrayList[String]
  topics.add(props.get("topic"))

  val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))

  def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
    builder.stream[String, String](
      topics,
      Consumed.`with`(String(), String())
    )
  }

  def init(): KafkaStreams = {
    val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)

    streams.start()

    streams
  }
}

class Streams() {

  val eventStream = new Stream("first_event") //looking good!
  val eventStream2 = new Stream("second_event") // no subscribers
  //if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
  val streams: KafkaStreams = eventStream.init()
  val streams2: KafkaStreams = eventStream2.init()

}

流配置

val streamConfig: Properties = {
    val properties = new Properties()
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
    properties
}

我也很想任何替代建议

Answer 1:

从我可以告诉我不能使用的消息条件转发

你知道KStream#branch() 它基本上是相同的条件转发。

我想我可以建立多个数据流在同一个应用程序,每听一个话题,映射和转发到表片,

这应该工作如下:

StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KStream stream2 = builder.stream("topic2");

stream1.to("table1-topic");
stream2.to("table2-topic");

但每次我试图创建KafkaStreams的两个实例,只有第一个初始化订阅了它的主题 - 另外从它的主题有没有订阅的客户得到一个警告。

不确定。 这应该工作。 也许你可以分享你的代码?



Answer 2:

当你创建你的KafkaStreams你需要传递特性与不同application.id,例如:

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP1");
StreamsBuilder builder = new SteamsBuilder();
KStream stream1 = builder.stream("topic1");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

然后你应该创建另一个流:

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"APP2");
StreamsBuilder builder = new SteamsBuilder();
KStream stream2 = builder.stream("topic2");
KafkaStreams streams2 = new KafkaStreams(builder, props);
streams2.start();


文章来源: KafkaStreams multiple streams in same application