I have two Java Application (App1, App2) to test how to access a KTable
from a different app on a single instance environment in docker.
The first App (App1) writes to a KTable
with following code.
public static void main(String[] args)
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);
KStreamBuilder builder = new KStreamBuilder();
KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");
KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();
countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");
countByApi.print();
final KafkaStreams streams = new KafkaStreams(builder,props);
streams.start();
System.out.println(streams.state());
System.out.println(streams.allMetadata());
System.out.println(streams.allMetadataForStore("countByApi"));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));
}
When I run my producer I got following output for the code in App1
RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19
This shows me state = RUNNING. Metadata are empty also for the store. But the request gets processed and store in the KTable successfully (String,Long).
When I run kafka-topics.sh --list --zookeeper:2181
I get the following topics.
bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed
This shows me that the KTable is somehow persisted with new topics.
I then have a secound command line app (App2) with following code which tries to access this KTable
as a state store (ReadOnlyKeyValueStore
) and access it.
public static void main( String[] args )
{
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
KStreamBuilder builder = new KStreamBuilder();
KafkaStreams streams = new KafkaStreams(builder,props);
streams.cleanUp();
streams.start();
System.out.println( "Hello World!" );
System.out.println(streams.state());
ReadOnlyKeyValueStore<String,Long> keyValueStore =
streams.store("countByApi", QueryableStoreTypes.keyValueStore());
final KeyValueIterator<String,Long> range = keyValueStore.all();
while(range.hasNext()){
KeyValue<String,Long> next = range.next();
System.out.println(String.format("key: %s | value: %s", next.key,next.value));
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println(streams.allMetadata());
streams.close();
}
}));
}
When I run the 2. App I do get the error message:
RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
at com.comp.streamtable.App.main(App.java:37)
Unfortunatly I do have only 1 instance and I verify that the state is equal "RUNNING".
Note: I had to choose different application.id for each app since this thew another Exception. Just wanted to point this out since this might be for interest.
What do I miss here to access my KTable from another app?
You are using two different
application.id
for both applications. Thus, both applications are completely decoupled.Interactive Queries are designed for different instances of the same app, and do not work across applications.
This blog post might help: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/