我试图使基于惯例和合理性与KafkaStreams实用的设计决策。
比方说,我有,我想将它放在两个不同的事件KTable
秒。 我有一个制片人将这些邮件发送到KStream
在监听到该主题。
从我可以告诉我不能使用条件转发使用的消息KafkaStreams
,所以如果流是加入了许多主题(每个以上的邮件,例如),我只能叫stream.to
在单水槽主题-否则,我会做一些像电话foreach
在流并用邮件发送KProducer
到水槽主题。
上述建议使用一个单一的数据流。 我想我可以建立多个数据流在同一个应用程序,每听一个话题,映射和转发到表片,但每次我试图创建的两个实例KafkaStreams
,只有第一个初始化订阅了它的主题-其他变从它的主题有没有订阅的客户发出警告。
我可以建立多个数据流在同一个应用程序? 如果是这样,是否有什么特殊要求?
class Stream(topic: String) {
val props: Option[Map[String, String]] = Some(TopicProps.get(topic))
val streamsBuilder = new StreamsBuilder
val topics = new util.ArrayList[String]
topics.add(props.get("topic"))
val stream: KStream[String, String] = configureStream(streamsBuilder, topics, props.get("sink"))
def configureStream(builder: StreamsBuilder, topics: java.util.List[String], sink: String): KStream[String, String] = {
builder.stream[String, String](
topics,
Consumed.`with`(String(), String())
)
}
def init(): KafkaStreams = {
val streams = new KafkaStreams(streamsBuilder.build(), KafkaConfig.streamConfig)
streams.start()
streams
}
}
class Streams() {
val eventStream = new Stream("first_event") //looking good!
val eventStream2 = new Stream("second_event") // no subscribers
//if I switch the other of these, eventStream2 is subscribed to and eventStream is dead in the water
val streams: KafkaStreams = eventStream.init()
val streams2: KafkaStreams = eventStream2.init()
}
流配置
val streamConfig: Properties = {
val properties = new Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-application")
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerHost)
properties
}
我也很想任何替代建议