I've got a table in SQL Server that I'd like to stream to Kafka topic, the structure is as follows:
(UserID, ReportID)
This table is going to be continuously changed (records added, inserted, no updates)
I'd like to transform this into this kind of structure and put into Elasticsearch:
{
"UserID": 1,
"Reports": [1, 2, 3, 4, 5, 6]
}
Examples I've seen so far are logs or click-stream which and do not work in my case.
Is this kind of use case possible at all? I could always just look at UserID
changes and query database, but that seems naive and not the best approach.
Update
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.ArrayList;
import java.util.Properties;
public class MyDemo {
public static void main(String... args) {
System.out.println("Hello KTable!");
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, Long> reportPermission = builder.stream(TOPIC);
KTable<Long, ArrayList<Long>> result = reportPermission
.groupByKey()
.aggregate(
new Initializer<ArrayList<Long>>() {
@Override
public ArrayList<Long> apply() {
return null;
}
},
new Aggregator<Long, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
aggregate.add(value);
return aggregate;
}
},
new Serde<ArrayList<Long>>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
@Override
public Serializer<ArrayList<Long>> serializer() {
return null;
}
@Override
public Deserializer<ArrayList<Long>> deserializer() {
return null;
}
});
result.to("report-aggregated-topic");
KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static final String TOPIC = "report-permission";
private static final Properties createStreamProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
return props;
}
}
I'm actually getting stuck at aggregate stage because I cannot write a proper SerDe for ArrayList<Long>
(not enough skills yet), lambdas seem not to work on aggregator - it doesn't know what's the type of agg
:
KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
.groupByKey()
.aggregate(
() -> new ArrayList<Long>(),
(key, val, agg) -> agg.add(val),
longSerde
);
Directly this kind of approach is not allowed in SQL and Kafka Streams, However the use case is possible and can be implemented as follows:
1) Write a custom application over SQL server using SOLRJ APIs which will hit Solr instance whenever a DML (Insert, Update, Delete etc ) operation is performed in SQL. https://wiki.apache.org/solr/Solrj
2) Use Solr SQL Data Import Handler by using it SQL Server will automatically inform solr whenever a DML (Insert, Update, Delete etc ) operation will occur in SQL. https://wiki.apache.org/solr/DataImportHandler
You can use Kafka's Connect API to get the data from SQL Server into Kafka. I am not aware of any specific connector for SQL Server but you can use any generic JDBC based connector: https://www.confluent.io/product/connectors/
To process data you can use Kafka's Streams API. You can simply
aggregate()
all report per user. Something like this:Check out the docs for more details about Streams API: https://docs.confluent.io/current/streams/index.html
Finally, you use Connect API to push the data into Elastic Search. There are multiple different connector available (I would of course recommend the Confluent one). More details about Connect API: https://docs.confluent.io/current/connect/userguide.html