Apache Kafka order windowed messages based on thei

2019-04-28 11:17发布

I'm trying to find a way to re-order messages within a topic partition and send ordered messages to a new topic.

I have Kafka publisher that sends String messages of the following format: {system_timestamp}-{event_name}?{parameters}

for example:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

Also, we add some message key for each message, to send them to the corresponding partition.

What I want to do is reorder events based on {system-timestamp} part of the message and within a 1-minute window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.

For example, we can deliver to the topic, a message with a bigger {system-timestamp} value first.

I've investigated Kafka Stream API and found some examples regarding messages windowing and aggregation:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

But what should I do next with this grouped stream? I don't see any 'sort() (e1,e2) -> e1.compareTo(e2)' methods available, also windows could be applied to methods like aggregation(), reduce() ,count() , but I think that I don't need any messages data manipulations.

How can I re-order message in the 1-minute window and send them to another topic?

2条回答
爷的心禁止访问
2楼-- · 2019-04-28 11:41

Here is how I ordered streams in my project.

  1. Created topology with source, processor, sink.
  2. In Processor
    1. process(key, value) -> Added each record to List(instance variable).
    2. Init() -> schedule(WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) -> punctuate (timestamp) sort list of items of window buffer time in List (instance variable) and iterate and forward. Clear List (instance variable).

This logic is working fine for me.

查看更多
等我变得足够好
3楼-- · 2019-04-28 11:55

Here's an outline:

Create a Processor implementation that:

  • in process() method, for each message:

    • reads the timestamp from the message value
    • inserts into a KeyValueStore using (timestamp, message-key) pair as the key and the message-value as the value. NB this also provides de-duplication. You'll need to provide a custom Serde to serialize the key so that the timestamp comes first, byte-wise, so that ranged queries are ordered by timestamp first.
  • in the punctuate() method:

    • reads the store using a ranged fetch from 0 to timestamp - 60'000 (=1 minute)
    • sends the fetched messages in order using context.forward() and deletes them from the store

The problem with this approach is that punctuate() is not triggered if no new msgs arrive to advance the "stream time". If this is a risk in your case, you can create an external scheduler that sends periodic "tick" messages to each(!) partition of your topic, that your processor should just ignore, but they'll cause punctuate to trigger in the absence of "real" msgs. KIP-138 will address this limitation by adding explicit support for system-time punctuation: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

查看更多
登录 后发表回答