Google Dataflow Pipeline with Instance Local Cache

2019-02-10 23:54发布

问题:

We want to build a Cloud Dataflow Streaming pipeline which ingests events from Pubsub and performs multiple ETL-like operations on each individual event. One of these operations is that each event has a device-id which need to be transformed to a different value (lets call it mapped-id), the mapping from the device-id->mapped-id being provided by an external service over a REST API. The same device-id might be repeated across multiple events - so these device-id->mapped-id mappings can be cached and re-used. Since we might be dealing with as many as 3M events per second at peak through the pipeline, the call to the REST API needs to be reduced as much as possible and also optimized when the calls are actually needed.

With this setup in mind, I have the following questions.

  • To optimize the REST API call, does Dataflow provide any in-built optimisations like connection pooling or if we decide to use our own such mechanisms, are there any restrictions/limitations we need to keep in mind?

  • We are looking at some of the in-memory cache options, to locally cache the mappings, some of which are backed by local files as well. So, are there any restrictions on how much memory (as a fraction of the total instance memory) can these cache use without affecting the regular Dataflow operations in the workers? if we use a file-backed cache, is there a path on each worker which is safe to use by the application itself to build these files on?

  • The number of unique device-id could be in the order of many millions - so not all those mappings can be stored in each instance. So to be able to utilize the local cache better, we need to get some affinity between the device-id and the workers where they are processed. I can do a group-by based on the device-id before the stage where this transform happens. If I do that, is there any guarantee that the same device-id will more or less be processed by the same worker? if there is some reasonable guarantee, then I would not have to hit the external REST API most of the time other than the first call which should be fine. Or is there a better way to ensure such affinity between the ids and the workers.

Thanks

回答1:

Here's a few things you can do:

  • Your DoFn's can have instance variables and you can put the cache there.
  • It's also ok to use regular Java static variables for a cache local to the VM, as long as you properly manage the multithreaded access to it. Guava CacheBuilder might be really helpful here.
  • Using regular Java APIs for temp files on a worker is safe (but again, be mindful of multithreaded / multiprocess access to your files, and make sure to clean them up - you may find the DoFn @Setup and @Teardown methods useful).
  • You can do a GroupByKey by the device id; then, most of the time, at least with the Cloud Dataflow runner, the same key will be processed by the same worker (though key assignments can change while the pipeline runs, but not too frequently usually). You'll probably want to set a windowing/triggering strategy with immediate triggering though.