Cannot access KTable from a different app as State

2019-07-11 06:43发布

问题:

I have two Java Application (App1, App2) to test how to access a KTable from a different app on a single instance environment in docker.

The first App (App1) writes to a KTable with following code.

public static void main(String[] args)
    {
        final Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service");   
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class);


    KStreamBuilder builder = new KStreamBuilder();


    KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed");


    KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream();

    countByApi.to(Serdes.String(), Serdes.Long(),"countByApi");

    countByApi.print();

    final KafkaStreams streams = new KafkaStreams(builder,props);

    streams.start();
    System.out.println(streams.state());

    System.out.println(streams.allMetadata());
    System.out.println(streams.allMetadataForStore("countByApi"));

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


        @Override
        public void run() {
            System.out.println(streams.allMetadata());
            streams.close();
        }
    }));
}

When I run my producer I got following output for the code in App1

    RUNNING
[]
[]
[KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19

This shows me state = RUNNING. Metadata are empty also for the store. But the request gets processed and store in the KTable successfully (String,Long).

When I run kafka-topics.sh --list --zookeeper:2181 I get the following topics.

bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
countByApi
gateway-Counts-changelog
gateway-Counts-repartition
gateway-service-Counts-changelog
gateway-service-Counts-repartition
gateway_request_processed

This shows me that the KTable is somehow persisted with new topics.

I then have a secound command line app (App2) with following code which tries to access this KTable as a state store (ReadOnlyKeyValueStore) and access it.

 public static void main( String[] args )
    {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092");


        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams(builder,props);

        streams.cleanUp();
        streams.start();
        System.out.println( "Hello World!" );
        System.out.println(streams.state());

        ReadOnlyKeyValueStore<String,Long> keyValueStore =
        streams.store("countByApi", QueryableStoreTypes.keyValueStore());

        final KeyValueIterator<String,Long> range = keyValueStore.all();

        while(range.hasNext()){
            KeyValue<String,Long> next = range.next();
            System.out.println(String.format("key: %s | value: %s", next.key,next.value));

        }

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {


                        @Override
                        public void run() {
                            System.out.println(streams.allMetadata());
                            streams.close();
                        }
        }));

    }

When I run the 2. App I do get the error message:

    RUNNING
Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728)
    at com.comp.streamtable.App.main(App.java:37)

Unfortunatly I do have only 1 instance and I verify that the state is equal "RUNNING".

Note: I had to choose different application.id for each app since this thew another Exception. Just wanted to point this out since this might be for interest.

What do I miss here to access my KTable from another app?

回答1:

You are using two different application.id for both applications. Thus, both applications are completely decoupled.

Interactive Queries are designed for different instances of the same app, and do not work across applications.

This blog post might help: https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/