Apache Flink add new stream dynamically

2019-08-22 12:04发布

Is it possible in Apache Flink, to add a new datastream dynamically during runtime without restarting the Job?

As far as I understood, a usual Flink program looks like this:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val text = env.socketTextStream(hostname, port, "\n")
val windowCounts = text.map...

env.execute("Socket Window WordCount")

In my case it is possible, that e.g. a new device is started and therefore another stream must be processed. But how to add this new stream on-the-fly?

1条回答
SAY GOODBYE
2楼-- · 2019-08-22 12:16

It is not possible to add new streams at runtime to a Flink program.

The way to solve this problem is to have a stream which contains all incoming events (e.g. a Kafka topic into which you ingest all individual streams). The events should have a key identifying from which stream they come. This key can then be used to keyBy the stream and to apply a per key processing logic.

If you want to read from multiple sockets, then you could write your own SourceFunction which reads from some input (e.g. from a fixed socket) the ports to open a socket for. Then internally you could maintain all these sockets open and read in a round robin fashion from them.

查看更多
登录 后发表回答