Manage state with huge memory usage - querying fro

2020-07-24 05:36发布

问题:

Apologies if this sounds dumb! We are working with flink to make async IO calls. A lot of the times, the IO calls are repeated (same set of parameters) and about 80% of the APIs that we call return the same response for the same parameters. So, we would like to avoid making the calls again. We thought we could use state to store previous responses and use them again. The issue is that though our responses can be used again, the number of such responses is huge and therefore requires a lot of memory. Is there a way to persist this to drive and query as and when required?

回答1:

Not a dumb question at all!

A few facts reveal why this isn't straightforward:

  1. Flink state is strictly local to a single operator. You can't access state in another operator.
  2. Flink offers one state backend that can spill to disk, which is RocksDB. Only keyed state is stored in RocksDB -- non-keyed state always lives on the heap.
  3. The async i/o operator can not be used on a keyed stream -- it only works in a non-keyed context.
  4. Using Iterations (cyclic connections in the job graph) with the DataStream API is a very bad idea (because it breaks checkpointing).

Of course, it may not be necessary for the cache be in Flink's managed state.

Some options:

  • Don't use keyed state for the cache. You could use something like a separate RocksDB instance for the cache, and implement the caching directly in the async i/o operator. If the cache would fit in memory, I'd suggest Guava.
  • Don't use async i/o. Do the fetching and caching yourself in a ProcessFunction, as proposed by @YuvalItzchakov.
  • You could use Stateful Functions instead. This is a new library and API that sits on top of Flink and overcomes some of the limitations listed above.
  • You could build something like the diagram below. Here the cache is held in keyed state in a CoProcessFunction. If the cache misses, a downstream async i/o operator is used to fetch the missing data. This then has to be looped back to the cache using an external queue, such as Kafka, or Kinesis, or Pulsar.
                    +---------------------+                                       +------+
                    |                     +--results from cache+---------------^--> SINK |
+--requests+------> |  CoProcessFunction  |                                    |  +------+
                    |                     |                                    |
+--cache misses+--> |  cache in RocksDB   |                    +-----------+   |
                    |                     +--side output:      | fetch via +---+-> loop back
     SOURCES        +---------------------+  cache misses+---> | async i/o |       as 2nd input
                                                               +-----------+       to fill cache