Datastore queries in Dataflow DoFn slow down pipel

2019-04-13 03:47发布


I am trying to enhance data in a pipeline by querying Datastore in a DoFn step. A field from an object from the Class CustomClass is used to do a query against a Datastore table and the returned values are used to enhance the object.

The code looks like this:

public class EnhanceWithDataStore extends DoFn<CustomClass, CustomClass> {

private static Datastore datastore = DatastoreOptions.defaultInstance().service();
private static KeyFactory articleKeyFactory = datastore.newKeyFactory().kind("article");

public void processElement(ProcessContext c) throws Exception {

    CustomClass event = c.element();

    Entity article = datastore.get(articleKeyFactory.newKey(event.getArticleId()));

    String articleName = "";
        articleName = article.getString("articleName");         
    } catch(Exception e) {}

    CustomClass enhanced = new CustomClass(event);


When it is run locally, this is fast, but when it is run in the cloud, this step slows down the pipeline significantly. What's causing this? Is there any workaround or better way to do this?

A picture of the pipeline can be found here (the last step is the enhancing step): pipeline architecture


What you are doing here is a join between your input PCollection<CustomClass> and the enhancements in Datastore.

For each partition of your PCollection the calls to Datastore are going to be single-threaded, hence incur a lot of latency. I would expect this to be slow in the DirectPipelineRunner and InProcessPipelineRunner as well. With autoscaling and dynamic work rebalancing, you should see parallelism when running on the Dataflow service unless something about the structure of your causes us to optimize it poorly, so you can try increasing --maxNumWorkers. But you still won't benefit from bulk operations.

It is probably better to express this join within your pipeline, using DatastoreIO.readFrom(...) followed by a CoGroupByKey transform. In this way, Dataflow will do a bulk parallel read of all the enhancements and use the efficient GroupByKey machinery to line them up with the events.

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key them both by the common id
PCollection<KV<Long, CustomClass>> keyedEvents =
    events.apply(WithKeys.of(event -> event.getArticleId()))

PCollection<KV<Long, Entity>> =
    articles.apply(WithKeys.of(article -> article.getKey().getId())

// Set up the join by giving tags to each collection
TupleTag<CustomClass> eventTag = new TupleTag<CustomClass>() {};
TupleTag<Entity> articleTag = new TupleTag<Entity>() {};
KeyedPCollectionTuple<Long> coGbkInput =
        .of(eventTag, keyedEvents)
        .and(articleTag, keyedArticles);

PCollection<CustomClass> enhancedEvents = coGbkInput
    .apply(MapElements.via(CoGbkResult joinResult -> {
      for (CustomClass event : joinResult.getAll(eventTag)) {
        String articleName;
        try {
          articleName = joinResult.getOnly(articleTag).getString("articleName");
        } catch(Exception e) {
          articleName = "";
        CustomClass enhanced = new CustomClass(event);
        return enhanced;

Another possibility, if there are very few enough articles to store the lookup in memory, is to use DatastoreIO.readFrom(...) and then read them all as a map side input via View.asMap() and look them up in a local table.

// Here are the two collections you want to join
PCollection<CustomClass> events = ...;
PCollection<Entity> articles = DatastoreIO.readFrom(...);

// Key the articles and create a map view
PCollectionView<Map<Long, Entity>> = articleView
    .apply(WithKeys.of(article -> article.getKey().getId())

// Do a lookup join by side input to a ParDo
PCollection<CustomClass> enhanced = events
    .apply(ParDo.withSideInputs(articles).of(new DoFn<CustomClass, CustomClass>() {
      public void processElement(ProcessContext c) {
        Map<Long, Entity> articleLookup = c.sideInput(articleView);
        String articleName;
        try {
          articleName =
        } catch(Exception e) {
          articleName = "";
        CustomClass enhanced = new CustomClass(event);
        return enhanced;

Depending on your data, either of these may be a better choice.


After some checking I managed to pinpoint the problem: the project is located in the EU (and as such, the Datastore is located in the EU-zone; same as the AppEningine zone), while the Dataflow jobs themselves (and thus the workers) are hosted in the US by default (when not overwriting the zone-option).

The difference in performance is 25-30 fold: ~40 elements/s compared to ~1200 elements/s for 15 workers.