我使用的芹菜独立的(不内的Django)。 我打算有多个物理机器上一个工人的任务类型运行。 该任务将执行以下操作
- 接受一个XML文档。
- 改造它。
- 让多个数据库读取和写入。
我使用PostgreSQL,但是这同样适用于使用连接其他存储类型。 在过去,我已经使用了数据库连接池,以避免对每个请求一个新的数据库连接或避免保持连接打开时间过长。 然而,由于每个芹菜工人在一个单独的进程中运行,我不知道他们会如何真正能够共享池。 我缺少的东西吗? 我知道,芹菜让你坚持从芹菜工人返回的结果,但是这不是我想在这里做。 每个任务可以根据处理的数据做几个不同的更新或插入。
什么是从芹菜工人中访问数据库的正确方法?
是否有可能跨多个工人/任务共享一个游泳池或者是有一些其他的方式来做到这一点?
我想每个工人一个连接tigeronk2的想法。 正如他所说,芹菜维护自己的工人因此池真的不是需要一个单独的数据库连接池。 将芹菜信号文档解释如何做定制的初始化时创建一个工人,所以我下面的代码添加到我的tasks.py,它似乎工作完全像你期望的那样。 我甚至能够关闭连接时,工人们正在关机:
db_conn = None
@worker_process_init.connect
def init_worker(**kwargs):
global db_conn
print('Initializing database connection for worker.')
db_conn = db.connect(DB_CONNECT_STRING)
@worker_process_shutdown.connect
def shutdown_worker(**kwargs):
global db_conn
if db_conn:
print('Closing database connectionn for worker.')
db_conn.close()
您可以覆盖默认行为,有螺纹的工人,而不是在你的芹菜配置每个进程的工人:
CELERYD_POOL = "celery.concurrency.threads.TaskPool"
然后您可以将共享池实例存储在您的任务实例并从每个线程任务调用引用它。
有每个工人的过程中的一个数据库连接。 由于芹菜本身维护工作进程池,你的数据库连接将始终等于芹菜工人的数量。 另一面,排序的,它会占用数据库连接池芹菜工作进程管理。 但是,这应该是罚款鉴于GIL只允许一个线程在一个时间的过程。
也许你可以使用pgbouncer 。 对于芹菜什么应该改变和连接池的过程之外进行。 我也有同样的问题 。
(“也许”,因为我不知道是否有可能是任何副作用)
也许, celery.concurrency.gevent可以提供共享池,而不是加剧了GIL。 然而,它的支持仍然是“实验性”。
和psycopg2.pool.SimpleConnectionPool分享当中greenlets(协程),这将在一个单一的进程/线程中运行。
其他点点堆栈上的话题讨论。
通过实施和监督有助于回到我的发现。
欢迎反馈。
参考:使用集中http://www.prschmid.com/2013/04/using-sqlalchemy-with-celery-tasks.html
每个工作进程(通过指定-CK预派生模式)将建立无池或重复使用到DB一个新的连接。 所以,如果用池,游泳池只在每个工作进程级别观察。 所以池大小> 1是没有用的,而是重用连接仍然是罚款保存从开放和紧密的联系。
如果使用每个工人的过程中的一个连接,1个DB连接每个工人的过程(prefork的模式芹菜-A应用工人-ck)在初始化阶段建立。 它反复保存来自开放和紧密的联系。
不管有多少工作线程(eventlet),每个工作线程(芹菜-A应用工人-P eventlet)只有建立无池或重复使用一个连接到数据库。 因此,对于eventlet,所有的工作线程(eventlets)上的一个过程芹菜(芹菜-A应用工作者......)必须在每一时刻1分贝连接。
据芹菜文档
但你需要确保你的任务不执行阻塞调用,因为这将制止工人的所有其他操作,直到阻塞调用返回。
这可能是由于MySQL数据库连接阻塞调用的方式。