Writing to an appengine blob asynchronously and fi

2020-03-07 06:43发布

I have a difficult problem.

I am iterating through a set of URLs parameterized by date and fetching them. For example, here is an example of one:

somewebservice.com?start=01-01-2012&end=01-10-2012

Sometimes, the content returned from the URL gets truncated (missing random results with a 'truncated error' message attached) because I've defined too large a range, so I have to split the query into two URLs

somewebservice.com?start=01-01-2012&end=01-05-2012

somewebservice.com?start=01-06-2012&end=01-10-2012

I do this recursively until the results aren't truncated anymore, and then I write to a blob, which allows concurrent writes.

Each of these URL fetch calls/blob writes is handled in a separate task queue task.

The problem is, I can't for the life of me devise a scheme to know when all the tasks have completed. I've tried using sharded counters, but the recursion makes it difficult. Someone suggested I use the Pipeline API, so I watched the Slatkin talk 3 times. It doesn't appear to work with recursion (but I admit I still don't fully understand the lib).

Is there anyway to know when a set of task queue tasks (and children that get spawned recursively) are completed so I can finalize my blob and do whatever with it?

Thanks, John

3条回答
Root(大扎)
2楼-- · 2020-03-07 07:24

All right, so here's what I did. I had to modify Mitch's solution just a bit, but he definitely got me in the right direction with the advice to return the future value instead of an immediate one.

I had to create an intermidate DummyJob that takes the output of the recursion

   public static class DummyJob extends Job1<Void, List<Void>> {
      @Override
      public Value<Void> run(List<Void> dummies) {
         return null;
      }
   }

Then, I submit the output of the DummyJob to the Blob Finalizer in a waitFor

List<FutureValue<Void>> dummies = new ArrayList<FutureValue<Void>>();
for (Interval in : ins) {
   dummies.add(futureCall(new DataFetcher(), immediate(file), immediate(in.getStart()),
         immediate(in.getEnd())));
}

FutureValue<Void> fv = futureCall(new DummyJob(), futureList(dummies));

return futureCall(new DataWriter(), immediate(file), waitFor(fv));

Thank you Mitch and Nick!!

查看更多
叼着烟拽天下
3楼-- · 2020-03-07 07:26

Have you read the Pipelines Getting Started docs? Pipelines can create other pipelines and wait on them, so doing what you want is fairly straightforward:

class RecursivePipeline(pipeline.Pipeline):
  def run(self, param):
    if some_condition: # Too big to process in one
      p1 = yield RecursivePipeline(param1)
      p2 = yield RecursivePipeline(param2)
      yield RecursiveCombiningPipeline(p1, p2)

Where RecursiveCombiningPipeline simply acts as a receiver for the values of the two sub-pipelines.

查看更多
▲ chillily
4楼-- · 2020-03-07 07:32

Here is an example using Java Pipeline

package com.example;

import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.Value;

public class PipelineRecursionDemo {

  /**
   * A Job to count the number of letters in a word
   * using recursion
   */
  public static class LetterCountJob extends Job1<Integer, String> {

    public Value<Integer> run(String word) {
      int length = word.length();
      if (length < 2) {
        return immediate(word.length());
      } else {
        int mid = length / 2;
        FutureValue<Integer> first = futureCall(new LetterCountJob(),
            immediate(word.substring(0, mid)));
        FutureValue<Integer> second = futureCall(new LetterCountJob(),
            immediate(word.substring(mid, length)));
        return futureCall(new SumJob(), first, second);
      }
    }
  }

  /**
   * An immediate Job to add two integers
   */
  public static class SumJob extends Job2<Integer, Integer, Integer> {

    public Value<Integer> run(Integer x, Integer y) {
      return immediate(x + y);
    }
  }
}
查看更多
登录 后发表回答