Can i use luigi with Python celery

2019-04-12 22:39发布

问题:

I am using celery for my web application. Celery executes Parent tasks which then executes further pipline of tasks

The issues with celery

  1. I can't get dependency graph and visualizer i get with luigi to see whats the status of my parent task

  2. Celery does not provide mechanism to restart the failed pipeline and start from where it failed.

These two thing i can easily get from luigi.

So i was thinking that once celery runs the parent task then inside that task i execute the Luigi pipleine.

Is there going to be any issue with that i.e i need to autoscale the celery workers based on queuesize . will that affect any luigi workers across multiple machines??

回答1:

Never tried but I think it should be possible to call a luigi task form inside a celery task, the same way you do it from python code in general:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

About scaling your queue and celery workers: if you have too many celery workers calling luigi tasks of course it will require you to scale your luigi scheduler/daemon so it can handle the number of API requests (every time you call a task to be excecuted, you hit the luigi scheduler API, every N seconds -it dependes on your config- your tasks will hit the scheduler API to say "I'm alive", every time a task finished with -error or success- you hit the scheduler API, and so on).

So yes, take a close look at your scheduler to see if it's receiving too many http requests or if its database is being a bottle neck (luigi uses by default an sqlite but you can easily change it to mysql o postgres).

UPDATE:

Since version 2.7.0, luigi.scheduler.CentralPlannerScheduler has been renamed to luigi.scheduler.Scheduler as you may see here so the above code should now be:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()