Kafka Streams table transformations

2019-06-07 09:04发布

问题:

I've got a table in SQL Server that I'd like to stream to Kafka topic, the structure is as follows:

(UserID, ReportID)

This table is going to be continuously changed (records added, inserted, no updates)

I'd like to transform this into this kind of structure and put into Elasticsearch:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

Examples I've seen so far are logs or click-stream which and do not work in my case.

Is this kind of use case possible at all? I could always just look at UserID changes and query database, but that seems naive and not the best approach.

Update

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

I'm actually getting stuck at aggregate stage because I cannot write a proper SerDe for ArrayList<Long> (not enough skills yet), lambdas seem not to work on aggregator - it doesn't know what's the type of agg:

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );

回答1:

You can use Kafka's Connect API to get the data from SQL Server into Kafka. I am not aware of any specific connector for SQL Server but you can use any generic JDBC based connector: https://www.confluent.io/product/connectors/

To process data you can use Kafka's Streams API. You can simply aggregate() all report per user. Something like this:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

Check out the docs for more details about Streams API: https://docs.confluent.io/current/streams/index.html

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs

Finally, you use Connect API to push the data into Elastic Search. There are multiple different connector available (I would of course recommend the Confluent one). More details about Connect API: https://docs.confluent.io/current/connect/userguide.html



回答2:

Directly this kind of approach is not allowed in SQL and Kafka Streams, However the use case is possible and can be implemented as follows:

1) Write a custom application over SQL server using SOLRJ APIs which will hit Solr instance whenever a DML (Insert, Update, Delete etc ) operation is performed in SQL. https://wiki.apache.org/solr/Solrj

2) Use Solr SQL Data Import Handler by using it SQL Server will automatically inform solr whenever a DML (Insert, Update, Delete etc ) operation will occur in SQL. https://wiki.apache.org/solr/DataImportHandler