How to properly read from Ignite Cache

2019-09-11 15:36发布

问题:

I have the following application (I'm quite new to this framework) and I'd like to see the cache size (increasing) as it reads messages from the queue but it stays 0 all the time.

    KafkaStreamer<String, String, String> kafkaStreamer = new KafkaStreamer<>();

    Ignition.setClientMode(true);

    Ignite ignite = Ignition.start();

    Properties settings = new Properties();
    // Set a few key parameters
    settings.put("bootstrap.servers", "localhost:9092");
    settings.put("group.id", "test");
    settings.put("zookeeper.connect", "localhost:2181");
    settings.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    settings.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    settings.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    // Create an instance of StreamsConfig from the Properties instance
    kafka.consumer.ConsumerConfig config = new ConsumerConfig(settings);

    IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");

    IgniteDataStreamer<String, String> stmr = ignite.dataStreamer("myCache");
    // allow overwriting cache data
    stmr.allowOverwrite(true);

    kafkaStreamer.setIgnite(ignite);
    kafkaStreamer.setStreamer(stmr);

    // set the topic
    kafkaStreamer.setTopic("test");

    // set the number of threads to process Kafka streams
    kafkaStreamer.setThreads(1);

    // set Kafka consumer configurations
    kafkaStreamer.setConsumerConfig(config);

    // set decoders
    StringDecoder keyDecoder = new StringDecoder(null);
    StringDecoder valueDecoder = new StringDecoder(null);

    kafkaStreamer.setKeyDecoder(keyDecoder);
    kafkaStreamer.setValueDecoder(valueDecoder);

    kafkaStreamer.start();

    while (true) {

        System.out.println(cache.metrics().getSize());
        Thread.sleep(200);
    }

Can anyone tell what is missing / wrong?

Thanks!

回答1:

Probably you don't consume enough entries to fill up IgniteDataStreamer buffers. Try to set flush timeout:

stmr.autoFlushFrequency(1000);


回答2:

Metrics is disabled by default due to a performance reasons. You can enable metrics using CacheConfiguration.setStatisticsEnabled(true) or statisticsEnabled property in your configuration file.