How to use push queues (tasks) with GAE?

2019-03-01 09:22发布

问题:

My GAE application should upload several files to another server (with urlfetch usage). How to implement this with tasks with assumption that at the end of last task completion I should perform one more action?

How can I know when the last task is completed?

Upd. Is the following approach with Tasks correct?

class Accumulator(db.Model):
    counter = db.IntegerProperty()

def increase_counter(key):
    obj = db.get(key)
    obj.counter += 1
    obj.put()

def zero_counter(key):
    obj = db.get(key)
    obj.counter = 0
    obj.put()

def decrease_counter(key):
    obj = db.get(key)
    obj.counter -= 1
    obj.put()

def get_counter(key):
    obj = db.get(key)
    return obj.counter

class PublishPhotosHandler(webapp.RequestHandler):
    # where this tasks_counter should be defined? seems not here
    db.run_in_transaction(zero_counter, some_unique_key)
    for argument in files_arguments:
        taskqueue.add(url='/upload', params={'key': key})
        db.run_in_transaction(increase_counter, some_unique_key)

    # here we redirect user to another page '/checkstatus'
    ...
    # nothing is shown to the user here

class UploadWorker(webapp.RequestHandler):
    def post(self):
        key = self.request.get('key')
        result = urlfetch.fetch(...)
        db.run_in_transaction(decrease_counter, some_unique_key)
        # how to return there error, so the task will be retried?
        # nothing is shown to the user here

# this is our '/checkstatus' page
class CheckStatus(webapp.RequestHandler):
    def get(self, key):
        if get_counter(some_unique_key) == 0:
            # all tasks finished
            # show the content

回答1:

You can make an counter entity, that stores current count of tasks to execute. And each task. after finish, must decrease value of this counter, in transaction. If it becomes 0, then it's last action, just finished.



回答2:

The code you show won't work - the tasks you're starting aren't in the same local scope as the counter, and may not even be on the same machine. You could use a datastore-based counter as @splix suggests, but this looks like a case where the Pipeline library would be ideally suited.