-->

Tornado celery integration hacks

2019-01-16 21:07发布

问题:

Since nobody provided a solution to this post plus the fact that I desperately need a workaround, here is my situation and some abstract solutions/ideas for debate.

My stack:

  1. Tornado
  2. Celery
  3. MongoDB
  4. Redis
  5. RabbitMQ

My problem: Find a way for Tornado to dispatch a celery task ( solved ) and then asynchronously gather the result ( any ideas? ).

Scenario 1: (request/response hack plus webhook)

  • Tornado receives a (user)request, then saves in local memory (or in Redis) a { jobID : (user)request} to remember where to propagate the response, and fires a celery task with jobID
  • When celery completes the task, it performs a webhook at some url and tells tornado that this jobID has finished ( plus the results )
  • Tornado retrieves the (user)request and forwards a response to the (user)

Can this happen? Does it have any logic?

Scenario 2: (tornado plus long-polling)

  • Tornado dispatches the celery task and returns some primary json data to the client (jQuery)
  • jQuery does some long-polling upon receipt of the primary json, say, every x microseconds, and tornado replies according to some database flag. When the celery task completes, this database flag is set to True, then jQuery "loop" is finished.

Is this efficient?

Any other ideas/schemas?

回答1:

I stumbled upon this question and hitting the results backend repeatedly did not look optimal to me. So I implemented a Mixin similar to your Scenario 1 using Unix Sockets.

It notifies Tornado as soon as the task finishes (to be accurate, as soon as next task in chain runs) and only hits results backend once. Here is the link.



回答2:

My solution involves polling from tornado to celery:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

Here is post about it.



回答3:

Here is our solution to the problem. Since we look for result in several handlers in our application we made the celery lookup a mixin class.

This also makes code more readable with the tornado.gen pattern.

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()


回答4:

Now, https://github.com/mher/tornado-celery comes to rescue...

class GenAsyncHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[3])
        self.write(str(response.result))
        self.finish()