Implementing Twisted style local multiple deferred

2019-04-13 23:52发布

问题:

I am quite new to using Celery and was wondering how TWSITED type multiple deferred callbacks can be implemented in Celery

MY TWISTED CODE uses perspective broker and is as follows. I have a Handler (server) which handles some events and returns the result. The Dispatcher (Client) prints the result returned using a deferred callback.

Handler.py (Server)

from twisted.application import service, internet
from twisted.internet import reactor, task
from twisted.spread import pb
from Dispatcher import Event
from Dispatcher import CopyEvent

class ReceiverEvent(pb.RemoteCopy, Event):
    pass
pb.setUnjellyableForClass(CopyEvent, ReceiverEvent)


class Handler(pb.Root):

def remote_eventEnqueue(self, pond):
    d = task.deferLater(reactor,5,handle_event,sender=self)
    return d

def handle_event(sender):
    print "Do Something"
    return "did something"

if __name__ == '__main__':
    h=Handler()
    reactor.listenTCP(8739, pb.PBServerFactory(h))
    reactor.run()

Now the Dispatcher.py (Client)

from twisted.spread import pb, jelly
from twisted.python import log
from twisted.internet import reactor
from Event import Event

class CopyEvent(Event, pb.Copyable):
    pass

class Dispatcher:
    def __init__(self, event):
        self.event = event

    def dispatch_event(self, remote):
        d = remote.callRemote("eventEnqueue", self.event)   
        d.addCallback(self.printMessage)

    def printMessage(self, text):
        print text

def main():
    from Handler import CopyEvent
    event = CopyEvent()
    d = Dispatcher(event)
    factory = pb.PBClientFactory()
    reactor.connectTCP("localhost", 8739, factory)
    deferred = factory.getRootObject()
    deferred.addCallback(d.dispatch_event)
    reactor.run()

if __name__ == '__main__':
    main()

I tried implementing this in Celery.

Handler.py (Server)

from celery import Celery

app=Celery('tasks',backend='amqp',broker='amqp://guest@localhost//')

@app.task

def handle_event():
     print "Do Something"
     return "did something"

Dispatcher.py (Client)

from Handler import handle_event
from datetime import datetime

def print_message(text):
    print text


t=handle_event.apply_async(countdown=10,link=print_message.s('Done'))  ##HOWTO?

My exact question is how can one implement deferred callbacks TWISTED style on local functions like print_message in Celery. When handle_Event method is finished it returns result on which I would like to have another callback method (print_message) which is LOCAL

Any other possible Design workflow to do this in Celery?

Thanks

JR

回答1:

Ok, so finally figured it out. It is not quite possible to add callbacks directly in the Celery client like the Twisted style. But Celery supports task monitoring functionality, that enables the client to monitor different kinds of worker events and add callbacks on it.

A simple task monitor (Task_Monitor.py) would look something like this. (Details can be found in Celery real processing documentation http://docs.celeryproject.org/en/latest/userguide/monitoring.html#real-time-processing)

Task_Monitor.py

from celery import Celery

def task_monitor(app):
    state = app.events.State()

    def announce_completed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print('TASK SUCCEEDED: %s[%s] %s' % (task.name, task.uuid, task.info(), ))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'task-succeeded': announce_completed_tasks})
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@REMOTEHOST//')
    task_monitor(app)

Task_Monitor.py has to be run as a separate process (client side). Besides the Celery application (server side) needs to be configured using

app.conf.CELERY_SEND_EVENTS = TRUE

or using -E option while running celery

so that it sends events in order for worker to be monitored.



回答2:

I would recommend using chains or one of the similar mechanisms for the Celery Canvas docs.

Example taken from the docs:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8 * 10
>>> res = chain(add.s(4, 4), mul.s(8), mul.s(10))
proj.tasks.add(4, 4) | proj.tasks.mul(8) | proj.tasks.mul(10)
>>> res.apply_async()