当芹菜地图/减少任务正在运行“./manage.py的runserver”重新启动; 有时引发与

2019-10-16 19:01发布

我有在火灾掀起了芹菜任务我Django项目的视图。 芹菜任务本身触发一些的map / reduce通过子/结构和工作Hadoop的作业的结果存储在磁盘上---没有什么实际存储在数据库中。 Hadoop的工作完成后,芹菜任务发送Django的信号,它是做,是这样的:

# tasks.py
from models import MyModel
import signals

from fabric.operations import local

from celery.task import Task

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")

什么是真正令人费解我的是,我仿佛在Django项目的地方改变了一些代码运行时,芹菜任务时的runserver 重新加载 ,Django的(我没有,我可以向你保证!)。 不时地,这甚至会导致在runserver命令错误,我看到类似以下的输出的runserver命令重新加载之前,再次是确定(注:此错误消息非常相似, 这里所描述的问题 )。

Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
    if not enabled():
TypeError: 'NoneType' object is not callable

Original exception was:
Traceback (most recent call last):
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
    run(addr, int(port), handler)
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
    httpd.serve_forever()
  File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
    r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'

我已经收窄的问题,当调用由通过更换到Hadoop的,以local("""hadoop ...""")local("ls")不会引起任何问题,重装的Django的runserver。 有在Hadoop的代码没有bug ---当它不被称为芹菜它运行在自己的就好了。

任何想法什么可能会造成这个?

Answer 1:

有关于这织物GitHub的页面上的一些讨论在这里 , 这里和这里 。 对于引发错误的另一个选项是用来设置上下文管理器:

from fabric.api import settings

class Hadoopification(Task):
    ...
    def hadoopify_function(self, my_model, other_args):
        with settings(warn_only=True):
            result = local(...)
        if result.failed:
            # access result.return_code, result.stdout, result.stderr
            raise UsefulException(...)

这具有允许访问返回代码和所有其他属性对结果的优势。



Answer 2:

所以在面料的源代码周围挖掘后,我才得知,Django最重装,因为我的芹菜任务,一个fabric.operations.local命令中运行,不及格(这是很难Hadoop的输出普科巨星内检测)。 当fabric.operations.local命令失败, 面料发出sys.exit造成芹菜死亡和Django的尝试并重新加载。 可以用受凉检测到这种错误SystemExit这样Hadoop的任务范围内:

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        try:
            local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")
        except SystemExit, e:
            # print some useful debugging information about exception e here!
            raise


Answer 3:

我的猜测是,有上名的一些碰撞任务中都芹菜和织物。 我建议使用更多的东西,如:

import celery
class Hadoopification(celery.task.Task):
    ...

和尝试,并避免任何进一步的冲突,如果预感是好的。

但实际上,布料的地方是相当Nieve酒店,并且是本质上只是一个subprocess.Popen,你可以尝试打电话原料也分离出任何东西,但蟒蛇STDLIB。



文章来源: './manage.py runserver' restarts when celery map/reduce tasks are running; sometimes raises error with inner_run